From faad1f26b1a0d2aa282e0245fbf1783b26953b65 Mon Sep 17 00:00:00 2001 From: gardners Date: Thu, 29 Nov 2012 15:38:04 +1030 Subject: [PATCH 01/33] Pass SID as well as ip:port through rhizome fetch request logic. This is in preparation for rhizome over MDP. --- rhizome.h | 8 +-- rhizome_direct_http.c | 4 +- rhizome_fetch.c | 134 ++++++++++++++++++++++++++++------------ rhizome_packetformats.c | 6 +- 4 files changed, 106 insertions(+), 46 deletions(-) diff --git a/rhizome.h b/rhizome.h index ab421cbf..c41be23e 100644 --- a/rhizome.h +++ b/rhizome.h @@ -326,8 +326,8 @@ int rhizome_sign_hash_with_key(rhizome_manifest *m,const unsigned char *sk, int rhizome_verify_bundle_privatekey(rhizome_manifest *m, const unsigned char *sk, const unsigned char *pk); int rhizome_find_bundle_author(rhizome_manifest *m); -int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, int timeout); -int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip); +int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, const unsigned char peersid[SID_SIZE], int timeout); +int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]); /* one manifest is required per candidate, plus a few spare. so MAX_RHIZOME_MANIFESTS must be > MAX_CANDIDATES. @@ -335,7 +335,7 @@ int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in #define MAX_RHIZOME_MANIFESTS 24 #define MAX_CANDIDATES 16 -int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip); +int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]); typedef struct rhizome_http_request { struct sched_ent alarm; @@ -552,7 +552,7 @@ enum rhizome_start_fetch_result { SLOTBUSY }; -enum rhizome_start_fetch_result rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length); +enum rhizome_start_fetch_result rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char sid[SID_SIZE],const unsigned char *prefix, size_t prefix_length); int rhizome_any_fetch_active(); int rhizome_any_fetch_queued(); diff --git a/rhizome_direct_http.c b/rhizome_direct_http.c index 7fc8f836..76c06ebd 100644 --- a/rhizome_direct_http.c +++ b/rhizome_direct_http.c @@ -731,6 +731,8 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r) DEBUGF("Dispatch size_high=%lld",r->cursor->size_high); rhizome_direct_transport_state_http *state = r->transport_specific_state; + unsigned char zerosid[SID_SIZE]="\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"; + int sock=socket(AF_INET, SOCK_STREAM, 0); if (sock==-1) { WHY_perror("socket"); @@ -876,7 +878,7 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r) Then as noted above, we can use that to pull the file down using existing routines. */ - if (!rhizome_fetch_request_manifest_by_prefix(&addr,&actionlist[i+1],RHIZOME_BAR_PREFIX_BYTES)) + if (!rhizome_fetch_request_manifest_by_prefix(&addr,zerosid,&actionlist[i+1],RHIZOME_BAR_PREFIX_BYTES)) { /* Fetching the manifest, and then using it to see if we want to fetch the file for import is all handled asynchronously, so just diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 873e56d0..8da4872c 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -29,7 +29,13 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ struct rhizome_fetch_candidate { rhizome_manifest *manifest; - struct sockaddr_in peer; + + /* Address of node offering manifest. + Can be either IP+port for HTTP or it can be a SID + for MDP. */ + struct sockaddr_in peer_ipandport; + unsigned char peer_sid[SID_SIZE]; + int priority; }; @@ -39,20 +45,32 @@ struct rhizome_fetch_candidate { struct rhizome_fetch_slot { struct sched_ent alarm; // must be first element in struct rhizome_manifest *manifest; - struct sockaddr_in peer; + + struct sockaddr_in peer_ipandport; + unsigned char peer_sid[SID_SIZE]; + int state; #define RHIZOME_FETCH_FREE 0 #define RHIZOME_FETCH_CONNECTING 1 #define RHIZOME_FETCH_SENDINGHTTPREQUEST 2 #define RHIZOME_FETCH_RXHTTPHEADERS 3 #define RHIZOME_FETCH_RXFILE 4 +#define RHIZOME_FETCH_RXFILEMDP 5 FILE *file; char filename[1024]; + int64_t file_len; + int64_t file_ofs; + + /* HTTP transport specific elements */ char request[1024]; int request_len; int request_ofs; - int64_t file_len; - int64_t file_ofs; + + /* MDP transport specific elements */ + unsigned char bar[RHIZOME_BAR_BYTES]; + int barP; + unsigned char prefix[RHIZOME_MANIFEST_ID_BYTES]; + int prefix_length; }; /* Represents a queue of fetch candidates and a single active fetch for bundle payloads whose size @@ -380,7 +398,8 @@ int rhizome_manifest_version_cache_lookup(rhizome_manifest *m) typedef struct ignored_manifest { unsigned char bid[crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES]; - struct sockaddr_in peer; + struct sockaddr_in peer_ipandport; + unsigned char peer_sid[SID_SIZE]; time_ms_t timeout; } ignored_manifest; @@ -400,7 +419,7 @@ typedef struct ignored_manifest_cache { a collision is exceedingly remote */ ignored_manifest_cache ignored; -int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip) +int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char *peersid) { int bin = m->cryptoSignPublic[0]>>(8-IGNORED_BIN_BITS); int slot; @@ -419,7 +438,7 @@ int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in return 0; } -int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, int timeout) +int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, const unsigned char peersid[SID_SIZE], int timeout) { /* The supplied manifest from a given IP has errors, so remember that it isn't worth considering */ @@ -439,8 +458,11 @@ int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in /* ignore for a while */ ignored.bins[bin].m[slot].timeout=gettime_ms()+timeout; bcopy(peerip, - &ignored.bins[bin].m[slot].peer, + &ignored.bins[bin].m[slot].peer_ipandport, sizeof(struct sockaddr_in)); + bcopy(peersid, + ignored.bins[bin].m[slot].peer_sid, + SID_SIZE); return 0; } @@ -468,31 +490,35 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot) WHYF_perror("fopen(`%s`, \"w\")", slot->filename); goto bail; } - if (slot->peer.sin_family == AF_INET) { + if (slot->peer_ipandport.sin_family == AF_INET) { /* Transfer via HTTP over IPv4 */ if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) { WHY_perror("socket"); - goto bail; + goto bail_http; } if (set_nonblock(sock) == -1) - goto bail; + goto bail_http; char buf[INET_ADDRSTRLEN]; - if (inet_ntop(AF_INET, &slot->peer.sin_addr, buf, sizeof buf) == NULL) { + if (inet_ntop(AF_INET, &slot->peer_ipandport.sin_addr, buf, sizeof buf) == NULL) { buf[0] = '*'; buf[1] = '\0'; } - if (connect(sock, (struct sockaddr*)&slot->peer, sizeof slot->peer) == -1) { + if (connect(sock, (struct sockaddr*)&slot->peer_ipandport, + sizeof slot->peer_ipandport) == -1) { if (errno == EINPROGRESS) { if (debug & DEBUG_RHIZOME_RX) DEBUGF("connect() returned EINPROGRESS"); } else { - WHYF_perror("connect(%d, %s:%u)", sock, buf, ntohs(slot->peer.sin_port)); - goto bail; + WHYF_perror("connect(%d, %s:%u)", sock, buf, + ntohs(slot->peer_ipandport.sin_port)); + goto bail_http; } } if (debug & DEBUG_RHIZOME_RX) - DEBUGF("RHIZOME HTTP REQUEST family=%u addr=%s port=%u %s", - slot->peer.sin_family, buf, ntohs(slot->peer.sin_port), alloca_str_toprint(slot->request) + DEBUGF("RHIZOME HTTP REQUEST family=%u addr=%s sid=%s port=%u %s", + slot->peer_ipandport.sin_family, + alloca_tohex_sid(slot->peer_sid), + buf, ntohs(slot->peer_ipandport.sin_port), alloca_str_toprint(slot->request) ); slot->alarm.poll.fd = sock; slot->request_ofs = 0; @@ -511,10 +537,14 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot) slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; schedule(&slot->alarm); return 0; - } else { - /* TODO: Fetch via overlay */ - WHY("Rhizome fetching via overlay not implemented"); } + + bail_http: + /* Fetch via overlay, either because no IP address was provided, or because + the connection/attempt to fetch via HTTP failed. */ + WHY("Rhizome fetching via overlay not implemented"); + slot->state=RHIZOME_FETCH_RXFILEMDP; + bail: if (sock != -1) close(sock); @@ -565,7 +595,7 @@ bail: * @author Andrew Bettison */ static enum rhizome_start_fetch_result -rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct sockaddr_in *peerip) +rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct sockaddr_in *peerip,unsigned const char *peersid) { if (slot->state != RHIZOME_FETCH_FREE) return SLOTBUSY; @@ -667,12 +697,20 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct // Start the fetch. //dump("peerip", peerip, sizeof *peerip); - slot->peer = *peerip; + + /* Prepare for fetching via HTTP */ + slot->peer_ipandport = *peerip; strbuf r = strbuf_local(slot->request, sizeof slot->request); strbuf_sprintf(r, "GET /rhizome/file/%s HTTP/1.0\r\n\r\n", m->fileHexHash); if (strbuf_overrun(r)) return WHY("request overrun"); slot->request_len = strbuf_len(r); + + /* Prepare for fetching via MDP */ + bcopy(peersid,slot->peer_sid,SID_SIZE); + rhizome_manifest_to_bar(m,slot->bar); + slot->barP=1; + if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "payload.%s", bid)) return -1; m->dataFileName = strdup(slot->filename); @@ -692,19 +730,30 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct * Returns -1 on error. */ enum rhizome_start_fetch_result -rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length) +rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, + const unsigned char peersid[SID_SIZE], + const unsigned char *prefix, size_t prefix_length) { assert(peerip); struct rhizome_fetch_slot *slot = rhizome_find_fetch_slot(MAX_MANIFEST_BYTES); if (slot == NULL) return SLOTBUSY; - slot->peer = *peerip; + + /* Prepare for fetching via HTTP */ + slot->peer_ipandport = *peerip; slot->manifest = NULL; strbuf r = strbuf_local(slot->request, sizeof slot->request); strbuf_sprintf(r, "GET /rhizome/manifestbyprefix/%s HTTP/1.0\r\n\r\n", alloca_tohex(prefix, prefix_length)); if (strbuf_overrun(r)) return WHY("request overrun"); slot->request_len = strbuf_len(r); + + /* Prepare for fetching via MDP */ + bcopy(peersid,slot->peer_sid,SID_SIZE); + bcopy(prefix,slot->prefix,prefix_length); + slot->prefix_length=prefix_length; + slot->barP=0; + if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "manifest.%s", alloca_tohex(prefix, prefix_length))) return -1; if (schedule_fetch(slot) == -1) { @@ -726,7 +775,7 @@ static void rhizome_start_next_queued_fetch(struct rhizome_fetch_slot *slot) int i = 0; struct rhizome_fetch_candidate *c; while (i < q->candidate_queue_size && (c = &q->candidate_queue[i])->manifest) { - int result = rhizome_fetch(slot, c->manifest, &c->peer); + int result = rhizome_fetch(slot, c->manifest, &c->peer_ipandport,c->peer_sid); switch (result) { case SLOTBUSY: return; @@ -781,7 +830,7 @@ static void rhizome_start_next_queued_fetches(struct sched_ent *alarm) * * @author Andrew Bettison */ -int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip) +int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]) { IN(); const char *bid = alloca_tohex_bid(m->cryptoSignPublic); @@ -807,7 +856,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock if (rhizome_manifest_verify(m) != 0) { WHY("Error verifying manifest when considering for import"); /* Don't waste time looking at this manifest again for a while */ - rhizome_queue_ignore_manifest(m, peerip, 60000); + rhizome_queue_ignore_manifest(m, peerip, peersid, 60000); rhizome_manifest_free(m); RETURN(-1); } @@ -841,7 +890,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock if (!m->selfSigned && rhizome_manifest_verify(m)) { WHY("Error verifying manifest when considering queuing for import"); /* Don't waste time looking at this manifest again for a while */ - rhizome_queue_ignore_manifest(m, peerip, 60000); + rhizome_queue_ignore_manifest(m, peerip, peersid, 60000); rhizome_manifest_free(m); RETURN(-1); } @@ -867,15 +916,16 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock if (!m->selfSigned && rhizome_manifest_verify(m)) { WHY("Error verifying manifest when considering queuing for import"); /* Don't waste time looking at this manifest again for a while */ - rhizome_queue_ignore_manifest(m, peerip, 60000); + rhizome_queue_ignore_manifest(m, peerip, peersid, 60000); rhizome_manifest_free(m); RETURN(-1); } struct rhizome_fetch_candidate *c = rhizome_fetch_insert(qi, ci); c->manifest = m; - c->peer = *peerip; c->priority = priority; + c->peer_ipandport = *peerip; + bcopy(peersid,c->peer_sid,SID_SIZE); if (debug & DEBUG_RHIZOME_RX) { DEBUG("Rhizome fetch queues:"); @@ -987,13 +1037,19 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by if (slot->manifest) { // Were fetching payload, now we have it. if (!rhizome_import_received_bundle(slot->manifest)){ - char buf[INET_ADDRSTRLEN]; - if (inet_ntop(AF_INET, &slot->peer.sin_addr, buf, sizeof buf) == NULL) { - buf[0] = '*'; - buf[1] = '\0'; + if (slot->state==RHIZOME_FETCH_RXFILE) { + char buf[INET_ADDRSTRLEN]; + if (inet_ntop(AF_INET, &slot->peer_ipandport.sin_addr, buf, sizeof buf) == NULL) { + buf[0] = '*'; + buf[1] = '\0'; + } + INFOF("Completed http request from %s:%u for file %s", + buf, ntohs(slot->peer_ipandport.sin_port), + slot->manifest->fileHexHash); + } else { + INFOF("Completed MDP request from %s for file %s", + alloca_tohex_sid(slot->peer_sid), slot->manifest->fileHexHash); } - INFOF("Completed http request from %s:%u for file %s", - buf, ntohs(slot->peer.sin_port), slot->manifest->fileHexHash); } } else { /* This was to fetch the manifest, so now fetch the file if needed */ @@ -1007,8 +1063,10 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by rhizome_manifest_free(m); } else { DEBUGF("All looks good for importing manifest id=%s", alloca_tohex_bid(m->cryptoSignPublic)); - dump("slot->peer",&slot->peer,sizeof(slot->peer)); - rhizome_suggest_queue_manifest_import(m, &slot->peer); + dump("slot->peerip",&slot->peer_ipandport,sizeof(slot->peer_ipandport)); + dump("slot->peersid",&slot->peer_sid,sizeof(slot->peer_sid)); + rhizome_suggest_queue_manifest_import(m, &slot->peer_ipandport, + slot->peer_sid); } } } diff --git a/rhizome_packetformats.c b/rhizome_packetformats.c index dc1d05cc..f6f66038 100644 --- a/rhizome_packetformats.c +++ b/rhizome_packetformats.c @@ -416,7 +416,7 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long RETURN(0); } - if (rhizome_ignore_manifest_check(m, &httpaddr)) + if (rhizome_ignore_manifest_check(m, &httpaddr,f->source->sid)) { /* Ignoring manifest that has caused us problems recently */ if (1) WARNF("Ignoring manifest with errors: %s*", manifest_id_prefix); @@ -431,7 +431,7 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long } else { if (debug & DEBUG_RHIZOME_ADS) DEBUG("Not seen before."); - rhizome_suggest_queue_manifest_import(m, &httpaddr); + rhizome_suggest_queue_manifest_import(m, &httpaddr,f->source->sid); // the above function will free the manifest structure, make sure we don't free it again m=NULL; } @@ -442,7 +442,7 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long DEBUG("Unverified manifest has errors - so not processing any further."); /* Don't waste any time on this manifest in future attempts for at least a minute. */ - rhizome_queue_ignore_manifest(m, &httpaddr, 60000); + rhizome_queue_ignore_manifest(m, &httpaddr,f->source->sid, 60000); } if (m) { rhizome_manifest_free(m); From a07e5761c4bfc7fcec8cfdad0655e92c7aef5030 Mon Sep 17 00:00:00 2001 From: gardners Date: Thu, 29 Nov 2012 16:15:51 +1030 Subject: [PATCH 02/33] added code to try switching to mdp if http fetching fails for any reason. --- rhizome_fetch.c | 184 +++++++++++++++++++++++++++--------------------- 1 file changed, 104 insertions(+), 80 deletions(-) diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 8da4872c..1ea36d17 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -990,6 +990,25 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) return 0; } +static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) +{ + /* close socket and stop watching it */ + unwatch(&slot->alarm); + unschedule(&slot->alarm); + if (slot->alarm.poll.fd!=-1) { + close(slot->alarm.poll.fd); + slot->alarm.poll.fd = -1; + } + + /* Begin MDP fetch process. + 1. Send initial request. + 2. Set timeout for next request (if fetching a file). + 3. Set timeout for no traffic received. + */ + DEBUGF("Fetch via MDP not implemented"); + return rhizome_fetch_close(slot); +} + void rhizome_fetch_write(struct rhizome_fetch_slot *slot) { if (debug & DEBUG_RHIZOME_RX) @@ -997,7 +1016,7 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot) int bytes = write_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_ofs], slot->request_len-slot->request_ofs); if (bytes == -1) { WHY("Got error while sending HTTP request. Closing."); - rhizome_fetch_close(slot); + rhizome_fetch_switch_to_mdp(slot); } else { // reset timeout unschedule(&slot->alarm); @@ -1086,88 +1105,93 @@ void rhizome_fetch_poll(struct sched_ent *alarm) if (alarm->poll.revents & (POLLIN | POLLOUT)) { switch (slot->state) { - case RHIZOME_FETCH_CONNECTING: - case RHIZOME_FETCH_SENDINGHTTPREQUEST: - rhizome_fetch_write(slot); + case RHIZOME_FETCH_CONNECTING: + case RHIZOME_FETCH_SENDINGHTTPREQUEST: + rhizome_fetch_write(slot); + return; + case RHIZOME_FETCH_RXFILEMDP: + if (debug & DEBUG_RHIZOME_RX) + DEBUG("Fetching via MDP not implemented"); + rhizome_fetch_close(slot); + break; + case RHIZOME_FETCH_RXFILE: { + /* Keep reading until we have the promised amount of data */ + char buffer[8192]; + sigPipeFlag = 0; + int bytes = read_nonblock(slot->alarm.poll.fd, buffer, sizeof buffer); + /* If we got some data, see if we have found the end of the HTTP request */ + if (bytes > 0) { + rhizome_write_content(slot, buffer, bytes); return; - case RHIZOME_FETCH_RXFILE: { - /* Keep reading until we have the promised amount of data */ - char buffer[8192]; - sigPipeFlag = 0; - int bytes = read_nonblock(slot->alarm.poll.fd, buffer, sizeof buffer); - /* If we got some data, see if we have found the end of the HTTP request */ - if (bytes > 0) { - rhizome_write_content(slot, buffer, bytes); - return; - } else { - if (debug & DEBUG_RHIZOME_RX) - DEBUG("Empty read, closing connection"); - rhizome_fetch_close(slot); - return; - } - if (sigPipeFlag) { - if (debug & DEBUG_RHIZOME_RX) - DEBUG("Received SIGPIPE, closing connection"); - rhizome_fetch_close(slot); - return; - } - } - break; - case RHIZOME_FETCH_RXHTTPHEADERS: { - /* Keep reading until we have two CR/LFs in a row */ - sigPipeFlag = 0; - int bytes = read_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_len], 1024 - slot->request_len - 1); - /* If we got some data, see if we have found the end of the HTTP reply */ - if (bytes > 0) { - // reset timeout - unschedule(&slot->alarm); - slot->alarm.alarm = gettime_ms() + RHIZOME_IDLE_TIMEOUT; - slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; - schedule(&slot->alarm); - slot->request_len += bytes; - if (http_header_complete(slot->request, slot->request_len, bytes)) { - if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Got HTTP reply: %s", alloca_toprint(160, slot->request, slot->request_len)); - /* We have all the reply headers, so parse them, taking care of any following bytes of - content. */ - struct http_response_parts parts; - if (unpack_http_response(slot->request, &parts) == -1) { - if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Failed HTTP request: failed to unpack http response"); - rhizome_fetch_close(slot); - return; - } - if (parts.code != 200) { - if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Failed HTTP request: rhizome server returned %d != 200 OK", parts.code); - rhizome_fetch_close(slot); - return; - } - if (parts.content_length == -1) { - if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Invalid HTTP reply: missing Content-Length header"); - rhizome_fetch_close(slot); - return; - } - slot->file_len = parts.content_length; - /* We have all we need. The file is already open, so just write out any initial bytes of - the body we read. - */ - slot->state = RHIZOME_FETCH_RXFILE; - int content_bytes = slot->request + slot->request_len - parts.content_start; - if (content_bytes > 0){ - rhizome_write_content(slot, parts.content_start, content_bytes); - return; - } - } - } - break; - default: + } else { + if (debug & DEBUG_RHIZOME_RX) + DEBUG("Empty read, closing connection"); + rhizome_fetch_switch_to_mdp(slot); + return; + } + if (sigPipeFlag) { + if (debug & DEBUG_RHIZOME_RX) + DEBUG("Received SIGPIPE, closing connection"); + rhizome_fetch_switch_to_mdp(slot); + return; + } + } + break; + case RHIZOME_FETCH_RXHTTPHEADERS: { + /* Keep reading until we have two CR/LFs in a row */ + sigPipeFlag = 0; + int bytes = read_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_len], 1024 - slot->request_len - 1); + /* If we got some data, see if we have found the end of the HTTP reply */ + if (bytes > 0) { + // reset timeout + unschedule(&slot->alarm); + slot->alarm.alarm = gettime_ms() + RHIZOME_IDLE_TIMEOUT; + slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; + schedule(&slot->alarm); + slot->request_len += bytes; + if (http_header_complete(slot->request, slot->request_len, bytes)) { if (debug & DEBUG_RHIZOME_RX) - DEBUG("Closing rhizome fetch connection due to illegal/unimplemented state."); - rhizome_fetch_close(slot); - return; + DEBUGF("Got HTTP reply: %s", alloca_toprint(160, slot->request, slot->request_len)); + /* We have all the reply headers, so parse them, taking care of any following bytes of + content. */ + struct http_response_parts parts; + if (unpack_http_response(slot->request, &parts) == -1) { + if (debug & DEBUG_RHIZOME_RX) + DEBUGF("Failed HTTP request: failed to unpack http response"); + rhizome_fetch_switch_to_mdp(slot); + return; + } + if (parts.code != 200) { + if (debug & DEBUG_RHIZOME_RX) + DEBUGF("Failed HTTP request: rhizome server returned %d != 200 OK", parts.code); + rhizome_fetch_switch_to_mdp(slot); + return; + } + if (parts.content_length == -1) { + if (debug & DEBUG_RHIZOME_RX) + DEBUGF("Invalid HTTP reply: missing Content-Length header"); + rhizome_fetch_switch_to_mdp(slot); + return; + } + slot->file_len = parts.content_length; + /* We have all we need. The file is already open, so just write out any initial bytes of + the body we read. + */ + slot->state = RHIZOME_FETCH_RXFILE; + int content_bytes = slot->request + slot->request_len - parts.content_start; + if (content_bytes > 0){ + rhizome_write_content(slot, parts.content_start, content_bytes); + return; + } } + } + break; + default: + if (debug & DEBUG_RHIZOME_RX) + DEBUG("Closing rhizome fetch connection due to illegal/unimplemented state."); + rhizome_fetch_close(slot); + return; + } } } if (alarm->poll.revents==0 || alarm->poll.revents & (POLLHUP | POLLERR)){ From 629a36f041e2d8c5fda3ecdfe550b8566bf903f3 Mon Sep 17 00:00:00 2001 From: gardners Date: Thu, 29 Nov 2012 16:43:19 +1030 Subject: [PATCH 03/33] call switch to mdp fetch if http request setup fails. --- rhizome_fetch.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 1ea36d17..b84d09ad 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -73,6 +73,8 @@ struct rhizome_fetch_slot { int prefix_length; }; +static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot); + /* Represents a queue of fetch candidates and a single active fetch for bundle payloads whose size * is less than a given threshold. * @@ -543,7 +545,9 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot) /* Fetch via overlay, either because no IP address was provided, or because the connection/attempt to fetch via HTTP failed. */ WHY("Rhizome fetching via overlay not implemented"); - slot->state=RHIZOME_FETCH_RXFILEMDP; + slot->state=RHIZOME_FETCH_RXFILEMDP; + rhizome_fetch_switch_to_mdp(slot); + return 0; bail: if (sock != -1) From c05ebada8cfa0b51103e3a3c66473f38f00633ba Mon Sep 17 00:00:00 2001 From: gardners Date: Thu, 29 Nov 2012 20:39:30 +1030 Subject: [PATCH 04/33] more work on rhizome over MDP. --- rhizome_fetch.c | 58 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/rhizome_fetch.c b/rhizome_fetch.c index b84d09ad..555f908f 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -71,6 +71,13 @@ struct rhizome_fetch_slot { int barP; unsigned char prefix[RHIZOME_MANIFEST_ID_BYTES]; int prefix_length; + int64_t mdpNextTX; + int64_t mdpLastRX; + int mdpIdleTimeout; + int64_t mdpRXWindowStart; + int mdpRXBlockLength; + uint32_t mdpRXBitmap; + unsigned char mdpRXWindow[32*200]; }; static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot); @@ -994,6 +1001,32 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) return 0; } +static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) +{ + if ((gettime_ms()-slot->mdpLastRX)>slot->mdpIdleTimeout) { + // connection timed out + return rhizome_fetch_close(slot); + } + slot->mdpNextTX=gettime_ms()+133; + + DEBUGF("Send MDP frame asking for blocks of data"); + + return 0; +} + +static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot) +{ + if ((gettime_ms()-slot->mdpLastRX)>slot->mdpIdleTimeout) { + // connection timed out + return rhizome_fetch_close(slot); + } + slot->mdpNextTX=gettime_ms()+100; + + DEBUGF("Send MDP frame asking for manifest"); + + return 0; +} + static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) { /* close socket and stop watching it */ @@ -1009,6 +1042,31 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) 2. Set timeout for next request (if fetching a file). 3. Set timeout for no traffic received. */ + + slot->mdpLastRX=gettime_ms(); + if (slot->barP) { + /* We are requesting a file. The http request may have already received + some of the file, so take that into account when setting up ring buffer. + Then send the request for the next block of data, and set our alarm to + re-ask in a little while. "In a little while" is 133ms, which is roughly + the time it takes to send 16KB via WiFi broadcast at the 1Mbit base rate + (this will need tuning for non-WiFi interfaces). 16KB ~= 32 x 200 bytes + which is the block size we will use. 200bytes allows for several blocks + to fit into a packet, and probably fit at least one any any outgoing packet + that is not otherwise full. */ + slot->mdpIdleTimeout=5000; // give up if nothing received for 5 seconds + slot->mdpRXWindowStart=slot->file_ofs; + slot->mdpRXBitmap=0x00000000; // no blocks received yet + rhizome_fetch_mdp_requestblocks(slot); + } else { + /* We are requesting a manifest, which is stateless, except that we eventually + give up. All we need to do now is send the request, and set our alarm to + try again in case we haven't heard anything back. */ + slot->mdpNextTX=gettime_ms()+100; + slot->mdpIdleTimeout=2000; // only try for two seconds + rhizome_fetch_mdp_requestmanifest(slot); + } + DEBUGF("Fetch via MDP not implemented"); return rhizome_fetch_close(slot); } From a665750f4cb46314a844cdee6b28247f112f3839 Mon Sep 17 00:00:00 2001 From: gardners Date: Thu, 29 Nov 2012 21:08:11 +1030 Subject: [PATCH 05/33] prepare and dispatch mdp frames for rhizome over mdp. still need to set bodies of frames, and schedule call backs, and handle the requests when received. --- constants.h | 2 ++ overlay_mdp.c | 2 ++ rhizome_fetch.c | 29 +++++++++++++++++++++++++++-- 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/constants.h b/constants.h index ab2ca433..e458c054 100644 --- a/constants.h +++ b/constants.h @@ -210,6 +210,8 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #define MDP_PORT_VOMP 0x10000002 #define MDP_PORT_DNALOOKUP 0x10000003 #define MDP_PORT_NOREPLY 0x10000000 +#define MDP_PORT_RHIZOME_REQUEST 0x10000004 +#define MDP_PORT_RHIZOME_RESPONSE 0x10000005 #define MDP_PORT_DIRECTORY 10 #define MDP_TYPE_MASK 0xff diff --git a/overlay_mdp.c b/overlay_mdp.c index f7614d7b..e3e0c650 100644 --- a/overlay_mdp.c +++ b/overlay_mdp.c @@ -608,6 +608,8 @@ int overlay_mdp_check_binding(struct subscriber *subscriber, int port, int userG case MDP_PORT_KEYMAPREQUEST: case MDP_PORT_VOMP: case MDP_PORT_DNALOOKUP: + case MDP_PORT_RHIZOME_RESPONSE: + case MDP_PORT_RHIZOME_REQUEST: return 0; } } diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 555f908f..8f5f3768 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -24,6 +24,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "rhizome.h" #include "str.h" #include "strbuf_helpers.h" +#include "overlay_address.h" /* Represents a queued fetch of a bundle payload, for which the manifest is already known. */ @@ -1009,7 +1010,19 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) } slot->mdpNextTX=gettime_ms()+133; - DEBUGF("Send MDP frame asking for blocks of data"); + overlay_mdp_frame mdp; + + bzero(&mdp,sizeof(mdp)); + struct subscriber *me=find_subscriber(NULL,0,0); + memcpy(me->sid,mdp.out.src.sid,SID_SIZE); + mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE; + bcopy(slot->peer_sid,mdp.out.dst.sid,SID_SIZE); + mdp.out.dst.port=MDP_PORT_RHIZOME_REQUEST; + DEBUGF("Set request manifest in MDP frame body"); + overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); + + DEBUGF("Set callback function, and set alarm"); + // schedule(&slot->alarm, return 0; } @@ -1022,7 +1035,19 @@ static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot) } slot->mdpNextTX=gettime_ms()+100; - DEBUGF("Send MDP frame asking for manifest"); + overlay_mdp_frame mdp; + + bzero(&mdp,sizeof(mdp)); + struct subscriber *me=find_subscriber(NULL,0,0); + memcpy(me->sid,mdp.out.src.sid,SID_SIZE); + mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE; + bcopy(slot->peer_sid,mdp.out.dst.sid,SID_SIZE); + mdp.out.dst.port=MDP_PORT_RHIZOME_REQUEST; + DEBUGF("Set request manifest in MDP frame body"); + overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); + + DEBUGF("Set callback function, and set alarm"); + // schedule(&slot->alarm, return 0; } From eb7524e0689f828da60c8b0571ca82a4c4311d4a Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 11:48:00 +1030 Subject: [PATCH 06/33] factored out internal MDP services into a separate file, and made core case statement in that brief and clear. --- overlay_mdp.c | 114 +----------------------------- overlay_mdp_services.c | 155 +++++++++++++++++++++++++++++++++++++++++ rhizome_fetch.c | 30 +++++++- serval.h | 1 + sourcefiles.mk | 1 + 5 files changed, 185 insertions(+), 116 deletions(-) create mode 100644 overlay_mdp_services.c diff --git a/overlay_mdp.c b/overlay_mdp.c index e3e0c650..cffe1ef6 100644 --- a/overlay_mdp.c +++ b/overlay_mdp.c @@ -436,119 +436,7 @@ static int overlay_saw_mdp_frame(struct overlay_frame *frame, overlay_mdp_frame RETURN(WHY("Failed to pass received MDP frame to client")); } else { /* No socket is bound, ignore the packet ... except for magic sockets */ - switch(mdp->out.dst.port) { - case MDP_PORT_VOMP: - RETURN(vomp_mdp_received(mdp)); - case MDP_PORT_KEYMAPREQUEST: - /* Either respond with the appropriate SAS, or record this one if it - verifies out okay. */ - if (debug & DEBUG_MDPREQUESTS) - DEBUG("MDP_PORT_KEYMAPREQUEST"); - RETURN(keyring_mapping_request(keyring,mdp)); - case MDP_PORT_DNALOOKUP: /* attempt to resolve DID to SID */ - { - int cn=0,in=0,kp=0; - char did[64+1]; - int pll=mdp->out.payload_length; - if (pll>64) pll=64; - /* get did from the packet */ - if (mdp->out.payload_length<1) { - RETURN(WHY("Empty DID in DNA resolution request")); } - bcopy(&mdp->out.payload[0],&did[0],pll); - did[pll]=0; - - if (debug & DEBUG_MDPREQUESTS) - DEBUG("MDP_PORT_DNALOOKUP"); - - int results=0; - while(keyring_find_did(keyring,&cn,&in,&kp,did)) - { - /* package DID and Name into reply (we include the DID because - it could be a wild-card DID search, but the SID is implied - in the source address of our reply). */ - if (keyring->contexts[cn]->identities[in]->keypairs[kp]->private_key_len > DID_MAXSIZE) - /* skip excessively long DID records */ - continue; - const unsigned char *packedSid = keyring->contexts[cn]->identities[in]->keypairs[0]->public_key; - const char *unpackedDid = (const char *) keyring->contexts[cn]->identities[in]->keypairs[kp]->private_key; - const char *name = (const char *)keyring->contexts[cn]->identities[in]->keypairs[kp]->public_key; - // URI is sid://SIDHEX/DID - strbuf b = strbuf_alloca(SID_STRLEN + DID_MAXSIZE + 10); - strbuf_puts(b, "sid://"); - strbuf_tohex(b, packedSid, SID_SIZE); - strbuf_puts(b, "/local/"); - strbuf_puts(b, unpackedDid); - overlay_mdp_dnalookup_reply(&mdp->out.src, packedSid, strbuf_str(b), unpackedDid, name); - kp++; - results++; - } - if (!results) { - /* No local results, so see if servald has been configured to use - a DNA-helper that can provide additional mappings. This provides - a generalised interface for resolving telephone numbers into URIs. - The first use will be for resolving DIDs to SIP addresses for - OpenBTS boxes run by the OTI/Commotion project. - - The helper is run asynchronously, and the replies will be delivered - when results become available, so this function will return - immediately, so as not to cause blockages and delays in servald. - */ - dna_helper_enqueue(mdp, did, mdp->out.src.sid); - monitor_tell_formatted(MONITOR_DNAHELPER, "LOOKUP:%s:%d:%s\n", alloca_tohex_sid(mdp->out.src.sid), mdp->out.src.port, did); - } - RETURN(0); - } - break; - case MDP_PORT_ECHO: /* well known ECHO port for TCP/UDP and now MDP */ - { - /* Echo is easy: we swap the sender and receiver addresses (and thus port - numbers) and send the frame back. */ - - /* Swap addresses */ - overlay_mdp_swap_src_dst(mdp); - mdp->out.ttl=0; - - /* Prevent echo:echo connections and the resulting denial of service from triggering endless pongs. */ - if (mdp->out.dst.port==MDP_PORT_ECHO) { - RETURN(WHY("echo loop averted")); - } - /* If the packet was sent to broadcast, then replace broadcast address - with our local address. For now just responds with first local address */ - if (is_sid_broadcast(mdp->out.src.sid)) - { - if (my_subscriber) - bcopy(my_subscriber->sid, - mdp->out.src.sid,SID_SIZE); - else - /* No local addresses, so put all zeroes */ - bzero(mdp->out.src.sid,SID_SIZE); - } - - /* Always send PONGs auth-crypted so that the receipient knows - that they are genuine, and so that we avoid the extra cost - of signing (which is slower than auth-crypting) */ - int preserved=mdp->packetTypeAndFlags; - mdp->packetTypeAndFlags&=~(MDP_NOCRYPT|MDP_NOSIGN); - - /* queue frame for delivery */ - overlay_mdp_dispatch(mdp,0 /* system generated */, - NULL,0); - mdp->packetTypeAndFlags=preserved; - - /* and switch addresses back around in case the caller was planning on - using MDP structure again (this happens if there is a loop-back reply - and the frame needs sending on, as happens with broadcasts. MDP ping - is a simple application where this occurs). */ - overlay_mdp_swap_src_dst(mdp); - - } - break; - default: - /* Unbound socket. We won't be sending ICMP style connection refused - messages, partly because they are a waste of bandwidth. */ - RETURN(WHYF("Received packet for which no listening process exists (MDP ports: src=%d, dst=%d", - mdp->out.src.port,mdp->out.dst.port)); - } + RETURN(overlay_mdp_try_interal_services(mdp)); } break; default: diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c new file mode 100644 index 00000000..478e2c0c --- /dev/null +++ b/overlay_mdp_services.c @@ -0,0 +1,155 @@ +/* +Copyright (C) 2010-2012 Paul Gardner-Stephen, Serval Project. + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ + +#include +#include "serval.h" +#include "str.h" +#include "strbuf.h" +#include "overlay_buffer.h" +#include "overlay_address.h" +#include "overlay_packet.h" +#include "mdp_client.h" +#include "crypto.h" + + +int overlay_mdp_service_dnalookup(overlay_mdp_frame *mdp) +{ + IN(); + int cn=0,in=0,kp=0; + char did[64+1]; + int pll=mdp->out.payload_length; + if (pll>64) pll=64; + /* get did from the packet */ + if (mdp->out.payload_length<1) { + RETURN(WHY("Empty DID in DNA resolution request")); } + bcopy(&mdp->out.payload[0],&did[0],pll); + did[pll]=0; + + if (debug & DEBUG_MDPREQUESTS) + DEBUG("MDP_PORT_DNALOOKUP"); + + int results=0; + while(keyring_find_did(keyring,&cn,&in,&kp,did)) + { + /* package DID and Name into reply (we include the DID because + it could be a wild-card DID search, but the SID is implied + in the source address of our reply). */ + if (keyring->contexts[cn]->identities[in]->keypairs[kp]->private_key_len > DID_MAXSIZE) + /* skip excessively long DID records */ + continue; + const unsigned char *packedSid = keyring->contexts[cn]->identities[in]->keypairs[0]->public_key; + const char *unpackedDid = (const char *) keyring->contexts[cn]->identities[in]->keypairs[kp]->private_key; + const char *name = (const char *)keyring->contexts[cn]->identities[in]->keypairs[kp]->public_key; + // URI is sid://SIDHEX/DID + strbuf b = strbuf_alloca(SID_STRLEN + DID_MAXSIZE + 10); + strbuf_puts(b, "sid://"); + strbuf_tohex(b, packedSid, SID_SIZE); + strbuf_puts(b, "/local/"); + strbuf_puts(b, unpackedDid); + overlay_mdp_dnalookup_reply(&mdp->out.src, packedSid, strbuf_str(b), unpackedDid, name); + kp++; + results++; + } + if (!results) { + /* No local results, so see if servald has been configured to use + a DNA-helper that can provide additional mappings. This provides + a generalised interface for resolving telephone numbers into URIs. + The first use will be for resolving DIDs to SIP addresses for + OpenBTS boxes run by the OTI/Commotion project. + + The helper is run asynchronously, and the replies will be delivered + when results become available, so this function will return + immediately, so as not to cause blockages and delays in servald. + */ + dna_helper_enqueue(mdp, did, mdp->out.src.sid); + monitor_tell_formatted(MONITOR_DNAHELPER, "LOOKUP:%s:%d:%s\n", + alloca_tohex_sid(mdp->out.src.sid), mdp->out.src.port, + did); + } + RETURN(0); +} + +int overlay_mdp_service_echo(overlay_mdp_frame *mdp) +{ + /* Echo is easy: we swap the sender and receiver addresses (and thus port + numbers) and send the frame back. */ + IN(); + + /* Swap addresses */ + overlay_mdp_swap_src_dst(mdp); + mdp->out.ttl=0; + + /* Prevent echo:echo connections and the resulting denial of service from triggering endless pongs. */ + if (mdp->out.dst.port==MDP_PORT_ECHO) { + RETURN(WHY("echo loop averted")); + } + /* If the packet was sent to broadcast, then replace broadcast address + with our local address. For now just responds with first local address */ + if (is_sid_broadcast(mdp->out.src.sid)) + { + if (my_subscriber) + bcopy(my_subscriber->sid, + mdp->out.src.sid,SID_SIZE); + else + /* No local addresses, so put all zeroes */ + bzero(mdp->out.src.sid,SID_SIZE); + } + + /* Always send PONGs auth-crypted so that the receipient knows + that they are genuine, and so that we avoid the extra cost + of signing (which is slower than auth-crypting) */ + int preserved=mdp->packetTypeAndFlags; + mdp->packetTypeAndFlags&=~(MDP_NOCRYPT|MDP_NOSIGN); + + /* queue frame for delivery */ + overlay_mdp_dispatch(mdp,0 /* system generated */, + NULL,0); + mdp->packetTypeAndFlags=preserved; + + /* and switch addresses back around in case the caller was planning on + using MDP structure again (this happens if there is a loop-back reply + and the frame needs sending on, as happens with broadcasts. MDP ping + is a simple application where this occurs). */ + overlay_mdp_swap_src_dst(mdp); + RETURN(0); +} + +int overlay_mdp_try_interal_services(overlay_mdp_frame *mdp) +{ + IN(); + switch(mdp->out.dst.port) { + case MDP_PORT_VOMP: + RETURN(vomp_mdp_received(mdp)); + case MDP_PORT_KEYMAPREQUEST: + /* Either respond with the appropriate SAS, or record this one if it + verifies out okay. */ + if (debug & DEBUG_MDPREQUESTS) DEBUG("MDP_PORT_KEYMAPREQUEST"); + RETURN(keyring_mapping_request(keyring,mdp)); + case MDP_PORT_DNALOOKUP: /* attempt to resolve DID to SID */ + RETURN(overlay_mdp_service_dnalookup(mdp)); + break; + case MDP_PORT_ECHO: /* well known ECHO port for TCP/UDP and now MDP */ + RETURN(overlay_mdp_service_echo(mdp)); + break; + default: + /* Unbound socket. We won't be sending ICMP style connection refused + messages, partly because they are a waste of bandwidth. */ + RETURN(WHYF("Received packet for which no listening process exists (MDP ports: src=%d, dst=%d", + mdp->out.src.port,mdp->out.dst.port)); + } +} diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 8f5f3768..b515f919 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -82,6 +82,8 @@ struct rhizome_fetch_slot { }; static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot); +static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot); +static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot); /* Represents a queue of fetch candidates and a single active fetch for bundle payloads whose size * is less than a given threshold. @@ -1002,6 +1004,20 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) return 0; } +static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm) +{ + struct rhizome_fetch_slot *slot=(struct rhizome_fetch_slot*)alarm; + long long now=gettime_ms(); + if (now>slot->mdpIdleTimeout) { + rhizome_fetch_close(slot); + return; + } + if (slot->barP) + rhizome_fetch_mdp_requestmanifest(slot); + else + rhizome_fetch_mdp_requestblocks(slot); +} + static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) { if ((gettime_ms()-slot->mdpLastRX)>slot->mdpIdleTimeout) { @@ -1018,12 +1034,16 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE; bcopy(slot->peer_sid,mdp.out.dst.sid,SID_SIZE); mdp.out.dst.port=MDP_PORT_RHIZOME_REQUEST; + mdp.out.ttl=1; + mdp.packetTypeAndFlags=MDP_TX; DEBUGF("Set request manifest in MDP frame body"); overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); DEBUGF("Set callback function, and set alarm"); - // schedule(&slot->alarm, - + slot->alarm.function = rhizome_fetch_mdp_slot_callback; + slot->alarm.alarm=slot->mdpNextTX; + schedule(&slot->alarm); + return 0; } @@ -1043,11 +1063,15 @@ static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot) mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE; bcopy(slot->peer_sid,mdp.out.dst.sid,SID_SIZE); mdp.out.dst.port=MDP_PORT_RHIZOME_REQUEST; + mdp.out.ttl=1; + mdp.packetTypeAndFlags=MDP_TX; DEBUGF("Set request manifest in MDP frame body"); overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); DEBUGF("Set callback function, and set alarm"); - // schedule(&slot->alarm, + slot->alarm.function = rhizome_fetch_mdp_slot_callback; + slot->alarm.alarm=slot->mdpNextTX; + schedule(&slot->alarm); return 0; } diff --git a/serval.h b/serval.h index 9f67b60c..8649f8e2 100644 --- a/serval.h +++ b/serval.h @@ -732,6 +732,7 @@ void overlay_dummy_poll(struct sched_ent *alarm); void overlay_route_tick(struct sched_ent *alarm); void server_shutdown_check(struct sched_ent *alarm); void overlay_mdp_poll(struct sched_ent *alarm); +int overlay_mdp_try_interal_services(overlay_mdp_frame *mdp); void fd_periodicstats(struct sched_ent *alarm); void rhizome_check_connections(struct sched_ent *alarm); diff --git a/sourcefiles.mk b/sourcefiles.mk index 99bc56e1..ff60275f 100644 --- a/sourcefiles.mk +++ b/sourcefiles.mk @@ -28,6 +28,7 @@ SERVAL_SOURCES = $(SERVAL_BASE)audiodevices.c \ $(SERVAL_BASE)overlay_interface.c \ $(SERVAL_BASE)overlay_queue.c \ $(SERVAL_BASE)overlay_mdp.c \ + $(SERVAL_BASE)overlay_mdp_services.c \ $(SERVAL_BASE)overlay_olsr.c \ $(SERVAL_BASE)overlay_packetformats.c \ $(SERVAL_BASE)overlay_payload.c \ From 7a4e3d20f83575ce80d5563f9dd3d9bf0887fc53 Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 14:17:05 +1030 Subject: [PATCH 07/33] Actually send Rhizome over MDP request packets. Actually call appropriate callback when receiving Rhizome over MDP packets. --- overlay_mdp_services.c | 32 +++++++++++++++++++------------- packetformats.c | 21 +++++++++++++++++++++ rhizome_fetch.c | 22 +++++++++++++++++++--- serval.h | 4 ++++ 4 files changed, 63 insertions(+), 16 deletions(-) diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index 478e2c0c..a186bb54 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -26,6 +26,19 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "mdp_client.h" #include "crypto.h" +int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) +{ + IN(); + + RETURN(-1); +} + +int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp) +{ + IN(); + + RETURN(-1); +} int overlay_mdp_service_dnalookup(overlay_mdp_frame *mdp) { @@ -133,19 +146,12 @@ int overlay_mdp_try_interal_services(overlay_mdp_frame *mdp) { IN(); switch(mdp->out.dst.port) { - case MDP_PORT_VOMP: - RETURN(vomp_mdp_received(mdp)); - case MDP_PORT_KEYMAPREQUEST: - /* Either respond with the appropriate SAS, or record this one if it - verifies out okay. */ - if (debug & DEBUG_MDPREQUESTS) DEBUG("MDP_PORT_KEYMAPREQUEST"); - RETURN(keyring_mapping_request(keyring,mdp)); - case MDP_PORT_DNALOOKUP: /* attempt to resolve DID to SID */ - RETURN(overlay_mdp_service_dnalookup(mdp)); - break; - case MDP_PORT_ECHO: /* well known ECHO port for TCP/UDP and now MDP */ - RETURN(overlay_mdp_service_echo(mdp)); - break; + case MDP_PORT_VOMP: RETURN(vomp_mdp_received(mdp)); + case MDP_PORT_KEYMAPREQUEST: RETURN(keyring_mapping_request(keyring,mdp)); + case MDP_PORT_DNALOOKUP: RETURN(overlay_mdp_service_dnalookup(mdp)); + case MDP_PORT_ECHO: RETURN(overlay_mdp_service_echo(mdp)); + case MDP_PORT_RHIZOME_REQUEST: RETURN(overlay_mdp_service_rhizomerequest(mdp)); + case MDP_PORT_RHIZOME_RESPONSE: RETURN(overlay_mdp_service_rhizomeresponse(mdp)); default: /* Unbound socket. We won't be sending ICMP style connection refused messages, partly because they are a waste of bandwidth. */ diff --git a/packetformats.c b/packetformats.c index be8bbb9e..ea1bd24d 100644 --- a/packetformats.c +++ b/packetformats.c @@ -40,3 +40,24 @@ int packetOk(struct overlay_interface *interface, unsigned char *packet, size_t return WHY("Packet type not recognised."); } +void write_uint64(unsigned char *o,uint64_t v) +{ + int i; + for(i=0;i<8;i++) + { *(o++)=v&0xff; v=v>>8; } +} + +void write_uint32(unsigned char *o,uint32_t v) +{ + int i; + for(i=0;i<4;i++) + { *(o++)=v&0xff; v=v>>8; } +} + +void write_uint16(unsigned char *o,uint16_t v) +{ + int i; + for(i=0;i<2;i++) + { *(o++)=v&0xff; v=v>>8; } +} + diff --git a/rhizome_fetch.c b/rhizome_fetch.c index b515f919..aca2caf4 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -1036,10 +1036,17 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) mdp.out.dst.port=MDP_PORT_RHIZOME_REQUEST; mdp.out.ttl=1; mdp.packetTypeAndFlags=MDP_TX; - DEBUGF("Set request manifest in MDP frame body"); + + mdp.out.queue=OQ_ORDINARY; + mdp.out.payload_length=RHIZOME_BAR_BYTES+8+4+2; + bcopy(slot->bar,&mdp.out.payload[0],RHIZOME_BAR_BYTES); + + write_uint64(&mdp.out.payload[RHIZOME_BAR_BYTES],slot->mdpRXWindowStart); + write_uint32(&mdp.out.payload[RHIZOME_BAR_BYTES+8],slot->mdpRXBitmap); + write_uint16(&mdp.out.payload[RHIZOME_BAR_BYTES+8+4],slot->mdpRXBlockLength); + overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); - DEBUGF("Set callback function, and set alarm"); slot->alarm.function = rhizome_fetch_mdp_slot_callback; slot->alarm.alarm=slot->mdpNextTX; schedule(&slot->alarm); @@ -1049,6 +1056,11 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot) { + if (slot->prefix_length<1||slot->prefix_length>32) { + // invalid request + return rhizome_fetch_close(slot); + } + if ((gettime_ms()-slot->mdpLastRX)>slot->mdpIdleTimeout) { // connection timed out return rhizome_fetch_close(slot); @@ -1065,7 +1077,11 @@ static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot) mdp.out.dst.port=MDP_PORT_RHIZOME_REQUEST; mdp.out.ttl=1; mdp.packetTypeAndFlags=MDP_TX; - DEBUGF("Set request manifest in MDP frame body"); + + mdp.out.queue=OQ_ORDINARY; + mdp.out.payload_length=slot->prefix_length; + bcopy(slot->prefix,&mdp.out.payload[0],slot->prefix_length); + overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); DEBUGF("Set callback function, and set alarm"); diff --git a/serval.h b/serval.h index 8649f8e2..5be1ce15 100644 --- a/serval.h +++ b/serval.h @@ -765,4 +765,8 @@ void dump_stack(); int olsr_init_socket(void); int olsr_send(struct overlay_frame *frame); +void write_uint64(unsigned char *o,uint64_t v); +void write_uint16(unsigned char *o,uint16_t v); +void write_uint32(unsigned char *o,uint32_t v); + #endif // __SERVALD_SERVALD_H From b91e832ad7a83432bf8e6dbad9f771fe6b2ef946 Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 14:47:27 +1030 Subject: [PATCH 08/33] added improved control of rhizome http and mdp servers via separate config options with their own predicate functions. --- overlay.c | 16 ++++++++-------- overlay_mdp_services.c | 21 ++++++++++++++------- rhizome.c | 32 ++++++++++++++++++++++++++++++-- rhizome.h | 7 +++++++ rhizome_http.c | 2 +- rhizome_packetformats.c | 2 +- 6 files changed, 61 insertions(+), 19 deletions(-) diff --git a/overlay.c b/overlay.c index 52bdbd6c..6d2cdf1c 100644 --- a/overlay.c +++ b/overlay.c @@ -124,16 +124,16 @@ schedule(&_sched_##X); } /* Get rhizome server started BEFORE populating fd list so that the server's listen socket is in the list for poll() */ - if (rhizome_enabled()) { - if (!rhizome_opendb()){ - /* Rhizome http server needs to know which callback to attach + if (is_rhizome_enabled()) rhizome_opendb(); + + /* Rhizome http server needs to know which callback to attach to client sockets, so provide it here, along with the name to appear in time accounting statistics. */ - rhizome_http_server_start(rhizome_server_parse_http_request, - "rhizome_server_parse_http_request", - RHIZOME_HTTP_PORT,RHIZOME_HTTP_PORT_MAX); - } - } + if (is_rhizome_http_enabled()) + rhizome_http_server_start(rhizome_server_parse_http_request, + "rhizome_server_parse_http_request", + RHIZOME_HTTP_PORT,RHIZOME_HTTP_PORT_MAX); + // start the dna helper if configured dna_helper_start(); diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index a186bb54..ac2586c5 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -24,6 +24,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "overlay_address.h" #include "overlay_packet.h" #include "mdp_client.h" +#include "rhizome.h" #include "crypto.h" int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) @@ -150,12 +151,18 @@ int overlay_mdp_try_interal_services(overlay_mdp_frame *mdp) case MDP_PORT_KEYMAPREQUEST: RETURN(keyring_mapping_request(keyring,mdp)); case MDP_PORT_DNALOOKUP: RETURN(overlay_mdp_service_dnalookup(mdp)); case MDP_PORT_ECHO: RETURN(overlay_mdp_service_echo(mdp)); - case MDP_PORT_RHIZOME_REQUEST: RETURN(overlay_mdp_service_rhizomerequest(mdp)); - case MDP_PORT_RHIZOME_RESPONSE: RETURN(overlay_mdp_service_rhizomeresponse(mdp)); - default: - /* Unbound socket. We won't be sending ICMP style connection refused - messages, partly because they are a waste of bandwidth. */ - RETURN(WHYF("Received packet for which no listening process exists (MDP ports: src=%d, dst=%d", - mdp->out.src.port,mdp->out.dst.port)); + case MDP_PORT_RHIZOME_REQUEST: + if (is_rhizome_mdp_server_running()) { + RETURN(overlay_mdp_service_rhizomerequest(mdp)); + } else break; + case MDP_PORT_RHIZOME_RESPONSE: + if (is_rhizome_mdp_server_running()) { + RETURN(overlay_mdp_service_rhizomeresponse(mdp)); + } else break; } + + /* Unbound socket. We won't be sending ICMP style connection refused + messages, partly because they are a waste of bandwidth. */ + RETURN(WHYF("Received packet for which no listening process exists (MDP ports: src=%d, dst=%d", + mdp->out.src.port,mdp->out.dst.port)); } diff --git a/rhizome.c b/rhizome.c index 04cf7300..dfbc7085 100644 --- a/rhizome.c +++ b/rhizome.c @@ -22,9 +22,37 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "rhizome.h" #include -int rhizome_enabled() +int is_rhizome_enabled() { - return confValueGetBoolean("rhizome.enable", 1);; + return confValueGetBoolean("rhizome.enable", 1); +} + +int is_rhizome_http_enabled() +{ + return confValueGetBoolean("rhizome.enable", 1) + && confValueGetBoolean("rhizome.http.enable", 1) + && rhizome_db; +} + +int is_rhizome_mdp_enabled() +{ + return confValueGetBoolean("rhizome.enable", 1) + && confValueGetBoolean("rhizome.mdp.enable", 1) + && rhizome_db; +} + +int is_rhizome_mdp_server_running() +{ + return is_rhizome_mdp_enabled(); +} + +int is_rhizome_advertise_enabled() +{ + return confValueGetBoolean("rhizome.enable", 1) + && confValueGetBoolean("rhizome.advertise.enable", 1) + && rhizome_db + && ( is_rhizome_http_server_running() + ||is_rhizome_mdp_server_running()); } int rhizome_fetch_delay_ms() diff --git a/rhizome.h b/rhizome.h index c41be23e..6e72aac2 100644 --- a/rhizome.h +++ b/rhizome.h @@ -439,6 +439,13 @@ int rhizome_http_server_start(int (*http_parse_func)(rhizome_http_request *), const char *http_parse_func_description, int port_low,int port_high); +int is_rhizome_enabled(); +int is_rhizome_mdp_enabled(); +int is_rhizome_http_enabled(); +int is_rhizome_advertise_enabled(); +int is_rhizome_mdp_server_running(); +int is_rhizome_http_server_running(); + typedef struct rhizome_direct_bundle_cursor { /* Where the current fill started */ long long start_size_high; diff --git a/rhizome_http.c b/rhizome_http.c index 72c92b82..23c8451a 100644 --- a/rhizome_http.c +++ b/rhizome_http.c @@ -72,7 +72,7 @@ unsigned char favicon_bytes[]={ ,0,0,0,0,0,0,0,0,0,0,0,0,0,0}; int favicon_len=318; -int rhizome_http_server_running() +int is_rhizome_http_server_running() { return rhizome_server_socket != -1; } diff --git a/rhizome_packetformats.c b/rhizome_packetformats.c index f6f66038..2bee4cec 100644 --- a/rhizome_packetformats.c +++ b/rhizome_packetformats.c @@ -145,7 +145,7 @@ int overlay_rhizome_add_advertisements(int interface_number, struct overlay_buff XXX We will move all processing of Rhizome into a separate process so that the CPU delays caused by Rhizome verifying signatures isn't a problem. */ - if (!rhizome_http_server_running() || !rhizome_db) + if (!is_rhizome_advertise_enabled()) RETURN(0); int pass; From 9e29db827f9cc1b14f4e98c3064f9d7ff56ee41e Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 15:02:34 +1030 Subject: [PATCH 09/33] added test for correct function of rhizome.http.enable config option. --- tests/rhizomeprotocol | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/rhizomeprotocol b/tests/rhizomeprotocol index 9314d935..2ccea213 100755 --- a/tests/rhizomeprotocol +++ b/tests/rhizomeprotocol @@ -89,6 +89,20 @@ test_FileTransfer() { assert_rhizome_received file2 } +doc_DisablingHTTPServer="Disabling HTTP rhizome transports works" +setup_DisablingHTTPServer() { + setup_common + set_instance +A + rhizome_add_file file1 + executeOk_servald config set rhizome.http.enable 0 + start_servald_instances +A +} +test_DisablingHTTPServer() { + local _logvar=LOG${2#+} + local _port=$($SED -n -e '/RHIZOME HTTP SERVER.*START/s/.*port=\([0-9]\{1,\}\).*/\1/p' "${!_logvar}" | $SED -n '$p') + assert --message="instance $2 Rhizome HTTP server not started" [ ! -n "$_port" ] +} + doc_FileTransferBig="Big new bundle transfers to one node" setup_FileTransferBig() { setup_common From 3c4abfb5666c75e3e18ea99e3f92dddddfc476b9 Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 15:06:31 +1030 Subject: [PATCH 10/33] improved and simplified test for rhizome http not running --- tests/rhizomeprotocol | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/rhizomeprotocol b/tests/rhizomeprotocol index 2ccea213..e3409f22 100755 --- a/tests/rhizomeprotocol +++ b/tests/rhizomeprotocol @@ -98,9 +98,7 @@ setup_DisablingHTTPServer() { start_servald_instances +A } test_DisablingHTTPServer() { - local _logvar=LOG${2#+} - local _port=$($SED -n -e '/RHIZOME HTTP SERVER.*START/s/.*port=\([0-9]\{1,\}\).*/\1/p' "${!_logvar}" | $SED -n '$p') - assert --message="instance $2 Rhizome HTTP server not started" [ ! -n "$_port" ] + !rhizome_http_server_started } doc_FileTransferBig="Big new bundle transfers to one node" From 6c0e6ef1c2e48704238754196e2cf09eb6064f82 Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 17:42:17 +1030 Subject: [PATCH 11/33] added extra tests for Rhizome over HTTP and MDP transports. HTTP transport works, MDP transport segfaults. To be examined. --- tests/rhizomeprotocol | 54 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/tests/rhizomeprotocol b/tests/rhizomeprotocol index e3409f22..08d3e0c4 100755 --- a/tests/rhizomeprotocol +++ b/tests/rhizomeprotocol @@ -101,6 +101,60 @@ test_DisablingHTTPServer() { !rhizome_http_server_started } +doc_HTTPTransport="Rhizome over HTTP transport" +setup_HTTPTransport() { + setup_common + set_instance +B + executeOk_servald config set rhizome.mdp.enable 0 + set_instance +A + executeOk_servald config set rhizome.mdp.enable 0 + rhizome_add_file file1 + start_servald_instances +A +B + foreach_instance +A assert_peers_are_instances +B + foreach_instance +B assert_peers_are_instances +A +} +test_HTTPTransport() { + wait_until bundle_received_by $BID:$VERSION +B + set_instance +B + executeOk_servald rhizome list '' + assert_rhizome_list --fromhere=0 file1 + assert_rhizome_received file1 + set_instance +A + rhizome_update_file file1 file2 + set_instance +B + wait_until bundle_received_by $BID:$VERSION +B + executeOk_servald rhizome list '' + assert_rhizome_list --fromhere=0 file2 + assert_rhizome_received file2 +} + +doc_MDPTransport="Rhizome over MDP transport" +setup_MDPTransport() { + setup_common + set_instance +B + executeOk_servald config set rhizome.http.enable 0 + set_instance +A + executeOk_servald config set rhizome.http.enable 0 + rhizome_add_file file1 + start_servald_instances +A +B + foreach_instance +A assert_peers_are_instances +B + foreach_instance +B assert_peers_are_instances +A +} +test_MDPTransport() { + wait_until bundle_received_by $BID:$VERSION +B + set_instance +B + executeOk_servald rhizome list '' + assert_rhizome_list --fromhere=0 file1 + assert_rhizome_received file1 + set_instance +A + rhizome_update_file file1 file2 + set_instance +B + wait_until bundle_received_by $BID:$VERSION +B + executeOk_servald rhizome list '' + assert_rhizome_list --fromhere=0 file2 + assert_rhizome_received file2 +} + doc_FileTransferBig="Big new bundle transfers to one node" setup_FileTransferBig() { setup_common From 5becae01369d4b73dc89247643992f5d5ac1b98d Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 20:03:18 +1030 Subject: [PATCH 12/33] fixed segfault bug with obtaining own SID. --- rhizome_fetch.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rhizome_fetch.c b/rhizome_fetch.c index aca2caf4..3735123d 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -1029,8 +1029,7 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) overlay_mdp_frame mdp; bzero(&mdp,sizeof(mdp)); - struct subscriber *me=find_subscriber(NULL,0,0); - memcpy(me->sid,mdp.out.src.sid,SID_SIZE); + memcpy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE); mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE; bcopy(slot->peer_sid,mdp.out.dst.sid,SID_SIZE); mdp.out.dst.port=MDP_PORT_RHIZOME_REQUEST; From 05745d9b9aa4e172e168af076a3dff7e2107bc2e Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 20:03:53 +1030 Subject: [PATCH 13/33] same fix as previous commit, but in another place. --- rhizome_fetch.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 3735123d..dc2db3bb 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -1069,8 +1069,7 @@ static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot) overlay_mdp_frame mdp; bzero(&mdp,sizeof(mdp)); - struct subscriber *me=find_subscriber(NULL,0,0); - memcpy(me->sid,mdp.out.src.sid,SID_SIZE); + memcpy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE); mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE; bcopy(slot->peer_sid,mdp.out.dst.sid,SID_SIZE); mdp.out.dst.port=MDP_PORT_RHIZOME_REQUEST; From 11cce162dc0a2dac8612a44db8b2598bd70177c6 Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 20:06:47 +1030 Subject: [PATCH 14/33] fixed bugs with preparing Rhizome MDP packets (source sid was not being set properly). --- rhizome_fetch.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/rhizome_fetch.c b/rhizome_fetch.c index dc2db3bb..27c98639 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -1029,7 +1029,7 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) overlay_mdp_frame mdp; bzero(&mdp,sizeof(mdp)); - memcpy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE); + bcopy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE); mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE; bcopy(slot->peer_sid,mdp.out.dst.sid,SID_SIZE); mdp.out.dst.port=MDP_PORT_RHIZOME_REQUEST; @@ -1044,6 +1044,9 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) write_uint32(&mdp.out.payload[RHIZOME_BAR_BYTES+8],slot->mdpRXBitmap); write_uint16(&mdp.out.payload[RHIZOME_BAR_BYTES+8+4],slot->mdpRXBlockLength); + DEBUGF("src sid=%s, dst sid=%s", + alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid)); + overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); slot->alarm.function = rhizome_fetch_mdp_slot_callback; @@ -1069,7 +1072,7 @@ static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot) overlay_mdp_frame mdp; bzero(&mdp,sizeof(mdp)); - memcpy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE); + bcopy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE); mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE; bcopy(slot->peer_sid,mdp.out.dst.sid,SID_SIZE); mdp.out.dst.port=MDP_PORT_RHIZOME_REQUEST; From e5d5f3f750803738524967a73035f757f5fd2d64 Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 20:20:33 +1030 Subject: [PATCH 15/33] fixed bugs with Rhizome over MDP requests. Now seems to output a series of MDP block requests as it should. --- rhizome_fetch.c | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 27c98639..c3a442d1 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -1008,20 +1008,24 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm) { struct rhizome_fetch_slot *slot=(struct rhizome_fetch_slot*)alarm; long long now=gettime_ms(); - if (now>slot->mdpIdleTimeout) { + if (now-slot->mdpLastRX>slot->mdpIdleTimeout) { + DEBUGF("MDP connection timed out"); rhizome_fetch_close(slot); return; } + DEBUGF("now-lastRX(0x%llx) <= idleTimeout(0x%llx)", + now-slot->mdpLastRX,slot->mdpIdleTimeout); if (slot->barP) - rhizome_fetch_mdp_requestmanifest(slot); - else rhizome_fetch_mdp_requestblocks(slot); + else + rhizome_fetch_mdp_requestmanifest(slot); } static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) { if ((gettime_ms()-slot->mdpLastRX)>slot->mdpIdleTimeout) { // connection timed out + DEBUGF("MDP connection timed out"); return rhizome_fetch_close(slot); } slot->mdpNextTX=gettime_ms()+133; @@ -1060,11 +1064,13 @@ static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot) { if (slot->prefix_length<1||slot->prefix_length>32) { // invalid request + DEBUGF("invalid MDP Rhizome request"); return rhizome_fetch_close(slot); } if ((gettime_ms()-slot->mdpLastRX)>slot->mdpIdleTimeout) { // connection timed out + DEBUGF("MDP connection timedout"); return rhizome_fetch_close(slot); } slot->mdpNextTX=gettime_ms()+100; @@ -1095,6 +1101,8 @@ static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot) static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) { + DEBUGF("Trying to switch to MDP for Rhizome fetch"); + /* close socket and stop watching it */ unwatch(&slot->alarm); unschedule(&slot->alarm); @@ -1109,6 +1117,7 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) 3. Set timeout for no traffic received. */ + DEBUGF("Preparing slot 0x%p: barP=%d",slot,slot->barP); slot->mdpLastRX=gettime_ms(); if (slot->barP) { /* We are requesting a file. The http request may have already received @@ -1133,8 +1142,7 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) rhizome_fetch_mdp_requestmanifest(slot); } - DEBUGF("Fetch via MDP not implemented"); - return rhizome_fetch_close(slot); + return 0; } void rhizome_fetch_write(struct rhizome_fetch_slot *slot) @@ -1143,8 +1151,9 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot) DEBUGF("write_nonblock(%d, %s)", slot->alarm.poll.fd, alloca_toprint(-1, &slot->request[slot->request_ofs], slot->request_len-slot->request_ofs)); int bytes = write_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_ofs], slot->request_len-slot->request_ofs); if (bytes == -1) { - WHY("Got error while sending HTTP request. Closing."); + WHY("Got error while sending HTTP request."); rhizome_fetch_switch_to_mdp(slot); + return; } else { // reset timeout unschedule(&slot->alarm); @@ -1217,6 +1226,7 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by } } } + DEBUGF("Closing rhizome fetch slot"); rhizome_fetch_close(slot); return; } From 692817ffbabfab89a5bbe3c8cce5399f922303f6 Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 21:02:43 +1030 Subject: [PATCH 16/33] added functions for reading values from byte streams. --- packetformats.c | 23 +++++++++++++++++++++++ serval.h | 3 +++ 2 files changed, 26 insertions(+) diff --git a/packetformats.c b/packetformats.c index ea1bd24d..63b19e5b 100644 --- a/packetformats.c +++ b/packetformats.c @@ -61,3 +61,26 @@ void write_uint16(unsigned char *o,uint16_t v) { *(o++)=v&0xff; v=v>>8; } } +uint64_t read_uint64(unsigned char *o) +{ + int i; + uint64_t v=0; + for(i=0;i<8;i++) v=(v<<8)|*(o++); + return v; +} + +uint32_t read_uint32(unsigned char *o) +{ + int i; + uint32_t v=0; + for(i=0;i<4;i++) v=(v<<8)|*(o++); + return v; +} + +uint16_t read_uint16(unsigned char *o) +{ + int i; + uint16_t v=0; + for(i=0;i<2;i++) v=(v<<8)|*(o++); + return v; +} diff --git a/serval.h b/serval.h index 5be1ce15..4ed6d496 100644 --- a/serval.h +++ b/serval.h @@ -768,5 +768,8 @@ int olsr_send(struct overlay_frame *frame); void write_uint64(unsigned char *o,uint64_t v); void write_uint16(unsigned char *o,uint16_t v); void write_uint32(unsigned char *o,uint32_t v); +uint64_t read_uint64(unsigned char *o); +uint32_t read_uint32(unsigned char *o); +uint16_t read_uint16(unsigned char *o); #endif // __SERVALD_SERVALD_H From 979ca36ba6425e854e6132bbb1686b2e70f63bbc Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 21:07:57 +1030 Subject: [PATCH 17/33] fixed byte order bug in read routines. --- packetformats.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packetformats.c b/packetformats.c index 63b19e5b..587bfffd 100644 --- a/packetformats.c +++ b/packetformats.c @@ -65,7 +65,7 @@ uint64_t read_uint64(unsigned char *o) { int i; uint64_t v=0; - for(i=0;i<8;i++) v=(v<<8)|*(o++); + for(i=0;i<8;i++) v=(v<<8)|o[8-1-i]; return v; } @@ -73,7 +73,7 @@ uint32_t read_uint32(unsigned char *o) { int i; uint32_t v=0; - for(i=0;i<4;i++) v=(v<<8)|*(o++); + for(i=0;i<4;i++) v=(v<<8)|o[4-1-i]; return v; } @@ -81,6 +81,6 @@ uint16_t read_uint16(unsigned char *o) { int i; uint16_t v=0; - for(i=0;i<2;i++) v=(v<<8)|*(o++); + for(i=0;i<2;i++) v=(v<<8)|o[2-1-i]; return v; } From 0c7419da5d15a41ba70b04897a2f60de52990445 Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 21:08:06 +1030 Subject: [PATCH 18/33] changed bar to bid in rhizome mdp fetch elements, because that is probably the better way to address the content. Should add version to make sure. --- overlay_mdp_services.c | 10 ++++++++++ rhizome_fetch.c | 21 +++++++++++---------- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index ac2586c5..c296db64 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -26,10 +26,20 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "mdp_client.h" #include "rhizome.h" #include "crypto.h" +#include "log.h" int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) { IN(); + DEBUGF("Someone sent me a rhizome request via MDP"); + DEBUGF("requestor sid = %s",alloca_tohex_sid(mdp->out.src.sid)); + DEBUGF("bundle ID = %s",alloca_tohex_bid(&mdp->out.payload[0])); + DEBUGF("file offset = 0x%llx", + read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES])); + DEBUGF("bitmap = 0x%08x", + read_uint32(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8])); + DEBUGF("block length = %d", + read_uint16(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+4])); RETURN(-1); } diff --git a/rhizome_fetch.c b/rhizome_fetch.c index c3a442d1..eb3bf1e6 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -68,8 +68,8 @@ struct rhizome_fetch_slot { int request_ofs; /* MDP transport specific elements */ - unsigned char bar[RHIZOME_BAR_BYTES]; - int barP; + unsigned char bid[RHIZOME_MANIFEST_ID_BYTES]; + int bidP; unsigned char prefix[RHIZOME_MANIFEST_ID_BYTES]; int prefix_length; int64_t mdpNextTX; @@ -722,8 +722,9 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct /* Prepare for fetching via MDP */ bcopy(peersid,slot->peer_sid,SID_SIZE); - rhizome_manifest_to_bar(m,slot->bar); - slot->barP=1; + bcopy(m->cryptoSignPublic,slot->bid,RHIZOME_MANIFEST_ID_BYTES); + DEBUGF("request bid=%s",alloca_tohex_bid(m->cryptoSignPublic)); + slot->bidP=1; if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "payload.%s", bid)) return -1; @@ -766,7 +767,7 @@ rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, bcopy(peersid,slot->peer_sid,SID_SIZE); bcopy(prefix,slot->prefix,prefix_length); slot->prefix_length=prefix_length; - slot->barP=0; + slot->bidP=0; if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "manifest.%s", alloca_tohex(prefix, prefix_length))) return -1; @@ -1015,7 +1016,7 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm) } DEBUGF("now-lastRX(0x%llx) <= idleTimeout(0x%llx)", now-slot->mdpLastRX,slot->mdpIdleTimeout); - if (slot->barP) + if (slot->bidP) rhizome_fetch_mdp_requestblocks(slot); else rhizome_fetch_mdp_requestmanifest(slot); @@ -1042,11 +1043,11 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) mdp.out.queue=OQ_ORDINARY; mdp.out.payload_length=RHIZOME_BAR_BYTES+8+4+2; - bcopy(slot->bar,&mdp.out.payload[0],RHIZOME_BAR_BYTES); + bcopy(slot->bid,&mdp.out.payload[0],RHIZOME_MANIFEST_ID_BYTES); write_uint64(&mdp.out.payload[RHIZOME_BAR_BYTES],slot->mdpRXWindowStart); write_uint32(&mdp.out.payload[RHIZOME_BAR_BYTES+8],slot->mdpRXBitmap); - write_uint16(&mdp.out.payload[RHIZOME_BAR_BYTES+8+4],slot->mdpRXBlockLength); + write_uint16(&mdp.out.payload[RHIZOME_BAR_BYTES+8+4],slot->mdpRXBlockLength); DEBUGF("src sid=%s, dst sid=%s", alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid)); @@ -1117,9 +1118,8 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) 3. Set timeout for no traffic received. */ - DEBUGF("Preparing slot 0x%p: barP=%d",slot,slot->barP); slot->mdpLastRX=gettime_ms(); - if (slot->barP) { + if (slot->bidP) { /* We are requesting a file. The http request may have already received some of the file, so take that into account when setting up ring buffer. Then send the request for the next block of data, and set our alarm to @@ -1132,6 +1132,7 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) slot->mdpIdleTimeout=5000; // give up if nothing received for 5 seconds slot->mdpRXWindowStart=slot->file_ofs; slot->mdpRXBitmap=0x00000000; // no blocks received yet + slot->mdpRXBlockLength=200; rhizome_fetch_mdp_requestblocks(slot); } else { /* We are requesting a manifest, which is stateless, except that we eventually From 1be28246e49f7c592b09cdd88583f504227aa02f Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 21:13:29 +1030 Subject: [PATCH 19/33] we now pass bid+version in Rhizome over MDP block request. --- overlay_mdp_services.c | 8 +++++--- rhizome_fetch.c | 14 +++++++++----- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index c296db64..f89c84cd 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -34,12 +34,14 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) DEBUGF("Someone sent me a rhizome request via MDP"); DEBUGF("requestor sid = %s",alloca_tohex_sid(mdp->out.src.sid)); DEBUGF("bundle ID = %s",alloca_tohex_bid(&mdp->out.payload[0])); - DEBUGF("file offset = 0x%llx", + DEBUGF("manifest version = 0x%llx", read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES])); + DEBUGF("file offset = 0x%llx", + read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8])); DEBUGF("bitmap = 0x%08x", - read_uint32(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8])); + read_uint32(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8])); DEBUGF("block length = %d", - read_uint16(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+4])); + read_uint16(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8+4])); RETURN(-1); } diff --git a/rhizome_fetch.c b/rhizome_fetch.c index eb3bf1e6..c981e6a6 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -69,6 +69,7 @@ struct rhizome_fetch_slot { /* MDP transport specific elements */ unsigned char bid[RHIZOME_MANIFEST_ID_BYTES]; + int64_t bidVersion; int bidP; unsigned char prefix[RHIZOME_MANIFEST_ID_BYTES]; int prefix_length; @@ -723,7 +724,9 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct /* Prepare for fetching via MDP */ bcopy(peersid,slot->peer_sid,SID_SIZE); bcopy(m->cryptoSignPublic,slot->bid,RHIZOME_MANIFEST_ID_BYTES); - DEBUGF("request bid=%s",alloca_tohex_bid(m->cryptoSignPublic)); + slot->bidVersion=m->version; + DEBUGF("request bid=%s, version=0x%llx", + alloca_tohex_bid(slot->bid),slot->bidVersion); slot->bidP=1; if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "payload.%s", bid)) @@ -1042,12 +1045,13 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) mdp.packetTypeAndFlags=MDP_TX; mdp.out.queue=OQ_ORDINARY; - mdp.out.payload_length=RHIZOME_BAR_BYTES+8+4+2; + mdp.out.payload_length=RHIZOME_BAR_BYTES+8+8+4+2; bcopy(slot->bid,&mdp.out.payload[0],RHIZOME_MANIFEST_ID_BYTES); - write_uint64(&mdp.out.payload[RHIZOME_BAR_BYTES],slot->mdpRXWindowStart); - write_uint32(&mdp.out.payload[RHIZOME_BAR_BYTES+8],slot->mdpRXBitmap); - write_uint16(&mdp.out.payload[RHIZOME_BAR_BYTES+8+4],slot->mdpRXBlockLength); + write_uint64(&mdp.out.payload[RHIZOME_BAR_BYTES],slot->bidVersion); + write_uint64(&mdp.out.payload[RHIZOME_BAR_BYTES+8],slot->mdpRXWindowStart); + write_uint32(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8],slot->mdpRXBitmap); + write_uint16(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8+4],slot->mdpRXBlockLength); DEBUGF("src sid=%s, dst sid=%s", alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid)); From 1912f24da4681fbef90ebf0eaff2a660f3e96eca Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 21:50:17 +1030 Subject: [PATCH 20/33] sending MDP Rhizome replies in response to requests now occurs, and replies are received, but not yet parsed. --- overlay_mdp_services.c | 93 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 87 insertions(+), 6 deletions(-) diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index f89c84cd..222faa44 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -36,12 +36,92 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) DEBUGF("bundle ID = %s",alloca_tohex_bid(&mdp->out.payload[0])); DEBUGF("manifest version = 0x%llx", read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES])); - DEBUGF("file offset = 0x%llx", - read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8])); - DEBUGF("bitmap = 0x%08x", - read_uint32(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8])); - DEBUGF("block length = %d", - read_uint16(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8+4])); + uint64_t fileOffset= + read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8]); + DEBUGF("file offset = 0x%llx",fileOffset); + uint32_t bitmap= + read_uint32(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8]); + DEBUGF("bitmap = 0x%08x",bitmap); + uint16_t blockLength= + read_uint16(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8+4]); + DEBUGF("block length = %d",blockLength); + if (blockLength>300) RETURN(-1); + + /* Find manifest that corresponds to BID and version. + If we don't have this combination, then do nothing. + If we do have the combination, then find the associated file, + and open the blob so that we can send some of it. + + TODO: If we have a newer version of the manifest, and the manifest is a + journal, then the newer version is okay to use to service this request. + */ + long long row_id=-1; + if (sqlite_exec_int64(&row_id, "SELECT rowid FROM FILES WHERE id IN (SELECT filehash FROM MANIFESTS WHERE manifests.version=%lld AND manifests.id='%s');", + read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES]), + alloca_tohex_bid(&mdp->out.payload[0])) < 1) + { + DEBUGF("Couldn't find stored file."); + RETURN(-1); + } + DEBUGF("manifest file row_id = %lld",row_id); + + sqlite3_blob *blob=NULL; + int ret=sqlite3_blob_open(rhizome_db, "main", "files", "data", + row_id, 0 /* read only */, &blob); + if (ret!=SQLITE_OK) + { + DEBUGF("Failed to open blob: %s",sqlite3_errmsg(rhizome_db)); + RETURN(-1); + } + int blob_bytes=sqlite3_blob_bytes(blob); + if (blob_bytessid,reply.out.src.sid,SID_SIZE); + // send replies to broadcast so that others can hear blocks and record them + // (not that preemptive listening is implemented yet). + reply.out.src.port=MDP_PORT_RHIZOME_RESPONSE; + memset(reply.out.dst.sid,0xff,SID_SIZE); + reply.out.dst.port=MDP_PORT_RHIZOME_RESPONSE; + reply.out.queue=OQ_ORDINARY; + reply.out.payload[0]='B'; // reply contains blocks + // include 16 bytes of BID prefix for identification + bcopy(&mdp->out.payload[0],&reply.out.payload[0],16); + // and version of manifest + bcopy(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES], + &reply.out.payload[16],8); + + int i; + for(i=0;i<32;i++) + if (!(bitmap&(1<<(31-i)))) + { + // calculate and set offset of block + uint64_t blockOffset=fileOffset+i*blockLength; + write_uint64(&reply.out.payload[16+8],blockOffset); + // work out how many bytes to read + int blockBytes=blob_bytes-blockOffset; + if (blockBytes>blockLength) blockBytes=blockLength; + // read data for block + if (blob_bytes>=blockOffset) { + sqlite3_blob_read(blob,&reply.out.payload[16+8+8], + blockBytes,0); + reply.out.payload_length=16+8+8+blockBytes; + } + // send packet + overlay_mdp_dispatch(&reply,0 /* system generated */, NULL,0); + } + + sqlite3_blob_close(blob); blob=NULL; RETURN(-1); } @@ -49,6 +129,7 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp) { IN(); + DEBUGF("Someone sent me a rhizome REPLY via MDP"); RETURN(-1); } From 94e8a4b676f79e582fec2fbe4ae54a94723c2db9 Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 21:52:44 +1030 Subject: [PATCH 21/33] fixed composition of rhizome over mdp reply packets. --- overlay_mdp_services.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index 222faa44..b28c5916 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -96,26 +96,26 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) reply.out.queue=OQ_ORDINARY; reply.out.payload[0]='B'; // reply contains blocks // include 16 bytes of BID prefix for identification - bcopy(&mdp->out.payload[0],&reply.out.payload[0],16); + bcopy(&mdp->out.payload[0],&reply.out.payload[1],16); // and version of manifest bcopy(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES], - &reply.out.payload[16],8); - + &reply.out.payload[1+16],8); + int i; for(i=0;i<32;i++) if (!(bitmap&(1<<(31-i)))) { // calculate and set offset of block uint64_t blockOffset=fileOffset+i*blockLength; - write_uint64(&reply.out.payload[16+8],blockOffset); + write_uint64(&reply.out.payload[1+16+8],blockOffset); // work out how many bytes to read int blockBytes=blob_bytes-blockOffset; if (blockBytes>blockLength) blockBytes=blockLength; // read data for block if (blob_bytes>=blockOffset) { - sqlite3_blob_read(blob,&reply.out.payload[16+8+8], + sqlite3_blob_read(blob,&reply.out.payload[1+16+8+8], blockBytes,0); - reply.out.payload_length=16+8+8+blockBytes; + reply.out.payload_length=1+16+8+8+blockBytes; } // send packet overlay_mdp_dispatch(&reply,0 /* system generated */, NULL,0); From ebbc8001cf0e5841c835b016e37bac1ff8f76e90 Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 22:02:10 +1030 Subject: [PATCH 22/33] fixed bug with sending data blocks (was sending packets for beyond end of file). --- overlay_mdp_services.c | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index b28c5916..a4bc72fe 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -74,6 +74,7 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) RETURN(-1); } int blob_bytes=sqlite3_blob_bytes(blob); + DEBUGF("blob_bytes=%d",blob_bytes); if (blob_bytesblockLength) blockBytes=blockLength; + DEBUGF("blockBytes=%d, blockOffset=%d",blockBytes,blockOffset); // read data for block if (blob_bytes>=blockOffset) { sqlite3_blob_read(blob,&reply.out.payload[1+16+8+8], blockBytes,0); reply.out.payload_length=1+16+8+8+blockBytes; - } - // send packet - overlay_mdp_dispatch(&reply,0 /* system generated */, NULL,0); + // send packet + overlay_mdp_dispatch(&reply,0 /* system generated */, NULL,0); + } else break; } sqlite3_blob_close(blob); blob=NULL; @@ -130,6 +133,25 @@ int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp) { IN(); DEBUGF("Someone sent me a rhizome REPLY via MDP"); + + if (!mdp->out.payload_length) RETURN(-1); + + int type=mdp->out.payload[0]; + switch (type) { + case 'B': /* data block */ + { + if (mdp->out.payload_length<(1+16+8+8+1)) RETURN(-1); + unsigned char *bid=&mdp->out.payload[1]; + uint64_t version=read_uint64(&mdp->out.payload[1+16]); + uint64_t offset=read_uint64(&mdp->out.payload[1+16+8]); + int bytes=mdp->out.payload_length-(1+16+8+8); + DEBUGF("Received %d bytes @ 0x%llx for %s version 0x%llx", + bytes,offset,alloca_tohex_bid(bid),version); + RETURN(-1); + } + break; + } + RETURN(-1); } From 79c90dde6e0ef72c3f410640cc231e572253178b Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 22:26:31 +1030 Subject: [PATCH 23/33] now rhizome mdp replies get parsed and the appropriate slot, if any, is located. --- overlay_mdp_services.c | 19 +++++++++++++++---- rhizome.h | 3 +++ rhizome_fetch.c | 23 +++++++++++++++++++++++ 3 files changed, 41 insertions(+), 4 deletions(-) diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index a4bc72fe..169cc706 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -141,12 +141,23 @@ int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp) case 'B': /* data block */ { if (mdp->out.payload_length<(1+16+8+8+1)) RETURN(-1); - unsigned char *bid=&mdp->out.payload[1]; + unsigned char *bidprefix=&mdp->out.payload[1]; uint64_t version=read_uint64(&mdp->out.payload[1+16]); uint64_t offset=read_uint64(&mdp->out.payload[1+16+8]); - int bytes=mdp->out.payload_length-(1+16+8+8); - DEBUGF("Received %d bytes @ 0x%llx for %s version 0x%llx", - bytes,offset,alloca_tohex_bid(bid),version); + int count=mdp->out.payload_length-(1+16+8+8); + unsigned char *bytes=&mdp->out.payload[1+16+8+8]; + DEBUGF("Received %d bytes @ 0x%llx for %s* version 0x%llx", + count,offset,alloca_tohex(bidprefix,16),version); + + /* Now see if there is a slot that matches. If so, then + see if the bytes are in the window, and write them. + + If there is not matching slot, then consider setting + a slot to capture this files as it is being requested + by someone else. + */ + rhizome_received_content(bidprefix,version,offset,count,bytes); + RETURN(-1); } break; diff --git a/rhizome.h b/rhizome.h index 6e72aac2..3c4eab97 100644 --- a/rhizome.h +++ b/rhizome.h @@ -428,6 +428,9 @@ struct http_response { unsigned long long content_length; const char * body; }; + +int rhizome_received_content(unsigned char *bid,uint64_t version, uint64_t offset, + int count,unsigned char *bytes); int rhizome_server_set_response(rhizome_http_request *r, const struct http_response *h); int rhizome_server_free_http_request(rhizome_http_request *r); int rhizome_server_http_send_bytes(rhizome_http_request *r); diff --git a/rhizome_fetch.c b/rhizome_fetch.c index c981e6a6..d432b983 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -1178,6 +1178,29 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot) } } +int rhizome_received_content(unsigned char *bidprefix,uint64_t version, uint64_t offset, + int count,unsigned char *bytes) +{ + IN(); + int i; + for(i=0;i(slot->file_len-slot->file_ofs)) From dbb1fe8d1e7edb4d3d6db40d9c5e24983b6f8392 Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 22:42:28 +1030 Subject: [PATCH 24/33] rhizome over mdp now writes content for in-order packets, and import gets triggered, but file hash currently doesn't match. --- overlay_mdp_services.c | 6 +++- rhizome.h | 5 ++-- rhizome_fetch.c | 65 +++++++++++++++++++++++++++--------------- 3 files changed, 50 insertions(+), 26 deletions(-) diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index 169cc706..665593d7 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -119,6 +119,9 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) sqlite3_blob_read(blob,&reply.out.payload[1+16+8+8], blockBytes,0); reply.out.payload_length=1+16+8+8+blockBytes; + + // Mark terminal block if required + if (blockOffset+blockBytes==blob_bytes) reply.out.payload[0]='T'; // send packet overlay_mdp_dispatch(&reply,0 /* system generated */, NULL,0); } else break; @@ -139,6 +142,7 @@ int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp) int type=mdp->out.payload[0]; switch (type) { case 'B': /* data block */ + case 'T': /* terminal data block */ { if (mdp->out.payload_length<(1+16+8+8+1)) RETURN(-1); unsigned char *bidprefix=&mdp->out.payload[1]; @@ -156,7 +160,7 @@ int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp) a slot to capture this files as it is being requested by someone else. */ - rhizome_received_content(bidprefix,version,offset,count,bytes); + rhizome_received_content(bidprefix,version,offset,count,bytes,type); RETURN(-1); } diff --git a/rhizome.h b/rhizome.h index 3c4eab97..42e0a178 100644 --- a/rhizome.h +++ b/rhizome.h @@ -429,8 +429,9 @@ struct http_response { const char * body; }; -int rhizome_received_content(unsigned char *bid,uint64_t version, uint64_t offset, - int count,unsigned char *bytes); +int rhizome_received_content(unsigned char *bidprefix,uint64_t version, + uint64_t offset,int count,unsigned char *bytes, + int type); int rhizome_server_set_response(rhizome_http_request *r, const struct http_response *h); int rhizome_server_free_http_request(rhizome_http_request *r); int rhizome_server_http_send_bytes(rhizome_http_request *r); diff --git a/rhizome_fetch.c b/rhizome_fetch.c index d432b983..2619c4cd 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -1178,29 +1178,6 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot) } } -int rhizome_received_content(unsigned char *bidprefix,uint64_t version, uint64_t offset, - int count,unsigned char *bytes) -{ - IN(); - int i; - for(i=0;i(slot->file_len-slot->file_ofs)) @@ -1265,6 +1242,48 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by schedule(&slot->alarm); } +int rhizome_received_content(unsigned char *bidprefix,uint64_t version, uint64_t offset, + int count,unsigned char *bytes,int type) +{ + IN(); + int i; + for(i=0;ifile_ofs==offset) { + debug=DEBUG_RHIZOME_RX; + /* We don't know the file length until we receive the last + block. If it isn't the last block, lie, and claim the end of + file is yet to come. */ + if (type=='T') slot->file_len=offset+count; + else slot->file_len=offset+count+1; + DEBUGF("Trying to write %d bytes @ %d (file len = %d)", + count,(int)slot->file_ofs,(int)slot->file_len); + rhizome_write_content(slot,(char *)bytes,count); + debug=0; + slot->mdpRXWindowStart=offset+count; + // TODO: Shift bitmap + RETURN(0); + } else { + // TODO: Implement out-of-order reception so that lost packets + // don't cause wastage + } + RETURN(0); + } + else + DEBUGF("Doesn't match this slot, because BIDs don't match: %s* vs %s", + alloca_tohex(bidprefix,16), + alloca_tohex_bid(rhizome_fetch_queues[i].active.bid)); + } + } + RETURN(-1); +} + void rhizome_fetch_poll(struct sched_ent *alarm) { struct rhizome_fetch_slot *slot = (struct rhizome_fetch_slot *) alarm; From 41261bf122f3323cc71c9ba872cf079443628fd8 Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 22:45:27 +1030 Subject: [PATCH 25/33] gadzooks, rhizome over mdp appears to work. --- overlay_mdp_services.c | 2 +- rhizome_fetch.c | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index 665593d7..4dac4572 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -117,7 +117,7 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) // read data for block if (blob_bytes>=blockOffset) { sqlite3_blob_read(blob,&reply.out.payload[1+16+8+8], - blockBytes,0); + blockBytes,blockOffset); reply.out.payload_length=1+16+8+8+blockBytes; // Mark terminal block if required diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 2619c4cd..196a95b6 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -1242,7 +1242,8 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by schedule(&slot->alarm); } -int rhizome_received_content(unsigned char *bidprefix,uint64_t version, uint64_t offset, +int rhizome_received_content(unsigned char *bidprefix, + uint64_t version, uint64_t offset, int count,unsigned char *bytes,int type) { IN(); @@ -1264,6 +1265,7 @@ int rhizome_received_content(unsigned char *bidprefix,uint64_t version, uint64_t else slot->file_len=offset+count+1; DEBUGF("Trying to write %d bytes @ %d (file len = %d)", count,(int)slot->file_ofs,(int)slot->file_len); + dump("content", bytes,count); rhizome_write_content(slot,(char *)bytes,count); debug=0; slot->mdpRXWindowStart=offset+count; From 7e6d836627b59198c3d8627ba143f5f8024f2370 Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 22:55:10 +1030 Subject: [PATCH 26/33] remember to update timeout when storing data via rhizome mdp. --- rhizome_fetch.c | 1 + 1 file changed, 1 insertion(+) diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 196a95b6..7aeadb4c 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -1269,6 +1269,7 @@ int rhizome_received_content(unsigned char *bidprefix, rhizome_write_content(slot,(char *)bytes,count); debug=0; slot->mdpRXWindowStart=offset+count; + slot->mdpLastRX=gettime_ms(); // TODO: Shift bitmap RETURN(0); } else { From 21f562122eb2330da029b91b7b400af6f36e8692 Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 30 Nov 2012 22:57:34 +1030 Subject: [PATCH 27/33] added test for rhizome over mdp for a big file, to make sure that timeout handling works properly. --- tests/rhizomeprotocol | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/tests/rhizomeprotocol b/tests/rhizomeprotocol index 08d3e0c4..45787b87 100755 --- a/tests/rhizomeprotocol +++ b/tests/rhizomeprotocol @@ -155,10 +155,36 @@ test_MDPTransport() { assert_rhizome_received file2 } -doc_FileTransferBig="Big new bundle transfers to one node" +doc_FileTransferBigMDP="Big new bundle transfers to one node via MDP" +setup_FileTransferBigMDP() { + setup_common + set_instance +B + executeOk_servald config set rhizome.http.enable 0 + set_instance +A + executeOk_servald config set rhizome.http.enable 0 + dd if=/dev/urandom of=file1 bs=1k count=1k 2>&1 + echo x >>file1 + ls -l file1 + rhizome_add_file file1 + start_servald_instances +A +B + foreach_instance +A assert_peers_are_instances +B + foreach_instance +B assert_peers_are_instances +A +} +test_FileTransferBigMDP() { + wait_until bundle_received_by $BID:$VERSION +B + set_instance +B + executeOk_servald rhizome list '' + assert_rhizome_list --fromhere=0 file1 + assert_rhizome_received file1 +} + +doc_FileTransferBig="Big new bundle transfers to one node via HTTP" setup_FileTransferBig() { setup_common + set_instance +B + executeOk_servald config set rhizome.mdp.enable 0 set_instance +A + executeOk_servald config set rhizome.mdp.enable 0 dd if=/dev/urandom of=file1 bs=1k count=1k 2>&1 echo x >>file1 ls -l file1 From ac734ff00d41060c3cea992f1ea46fcc0d668bb9 Mon Sep 17 00:00:00 2001 From: gardners Date: Sat, 1 Dec 2012 07:37:22 +1030 Subject: [PATCH 28/33] fixed bugs with rhizome over mdp receive for files >32*200 bytes long. also we request next block of data immediately after receiving all of the previous block. --- overlay_mdp_services.c | 6 +++--- rhizome_fetch.c | 49 +++++++++++++++++++++++++++++++----------- 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index 4dac4572..c00db07f 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -135,7 +135,6 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp) { IN(); - DEBUGF("Someone sent me a rhizome REPLY via MDP"); if (!mdp->out.payload_length) RETURN(-1); @@ -150,8 +149,9 @@ int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp) uint64_t offset=read_uint64(&mdp->out.payload[1+16+8]); int count=mdp->out.payload_length-(1+16+8+8); unsigned char *bytes=&mdp->out.payload[1+16+8+8]; - DEBUGF("Received %d bytes @ 0x%llx for %s* version 0x%llx", - count,offset,alloca_tohex(bidprefix,16),version); + if (0) + DEBUGF("Received %d bytes @ 0x%llx for %s* version 0x%llx", + count,offset,alloca_tohex(bidprefix,16),version); /* Now see if there is a slot that matches. If so, then see if the bytes are in the window, and write them. diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 7aeadb4c..d0227ba4 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -76,6 +76,7 @@ struct rhizome_fetch_slot { int64_t mdpNextTX; int64_t mdpLastRX; int mdpIdleTimeout; + int mdpResponsesReceived; int64_t mdpRXWindowStart; int mdpRXBlockLength; uint32_t mdpRXBitmap; @@ -977,7 +978,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) { - if (debug & DEBUG_RHIZOME_RX) + // if (debug & DEBUG_RHIZOME_RX) DEBUGF("close Rhizome fetch slot=%d", slotno(slot)); assert(slot->state != RHIZOME_FETCH_FREE); @@ -1032,6 +1033,11 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) DEBUGF("MDP connection timed out"); return rhizome_fetch_close(slot); } + // only issue new requests every 133ms. + // we automatically re-issue once we have received all packets in this + // request also, so if there is no packet loss, we can go substantially + // faster. Optimising behaviour when there is no packet loss is an + // outstanding task. slot->mdpNextTX=gettime_ms()+133; overlay_mdp_frame mdp; @@ -1053,11 +1059,17 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) write_uint32(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8],slot->mdpRXBitmap); write_uint16(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8+4],slot->mdpRXBlockLength); - DEBUGF("src sid=%s, dst sid=%s", - alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid)); + DEBUGF("src sid=%s, dst sid=%s, mdpRXWindowStart=0x%x", + alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid), + slot->mdpRXWindowStart); overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); + // remember when we sent the request so that we can adjust the inter-request + // interval based on how fast the packets arrive. + slot->mdpResponsesReceived=0; + + unschedule(&slot->alarm); slot->alarm.function = rhizome_fetch_mdp_slot_callback; slot->alarm.alarm=slot->mdpNextTX; schedule(&slot->alarm); @@ -1253,8 +1265,6 @@ int rhizome_received_content(unsigned char *bidprefix, if (!bcmp(rhizome_fetch_queues[i].active.bid,bidprefix, 16)) { - DEBUGF("This response matches slot 0x%p", - rhizome_fetch_queues[i].active); struct rhizome_fetch_slot *slot=&rhizome_fetch_queues[i].active; if (slot->file_ofs==offset) { debug=DEBUG_RHIZOME_RX; @@ -1263,14 +1273,25 @@ int rhizome_received_content(unsigned char *bidprefix, file is yet to come. */ if (type=='T') slot->file_len=offset+count; else slot->file_len=offset+count+1; - DEBUGF("Trying to write %d bytes @ %d (file len = %d)", - count,(int)slot->file_ofs,(int)slot->file_len); - dump("content", bytes,count); rhizome_write_content(slot,(char *)bytes,count); debug=0; slot->mdpRXWindowStart=offset+count; - slot->mdpLastRX=gettime_ms(); + slot->mdpLastRX=gettime_ms(); + // TODO: Shift bitmap + + unschedule(&slot->alarm); + slot->alarm.function = rhizome_fetch_mdp_slot_callback; + slot->alarm.alarm=slot->mdpNextTX; + schedule(&slot->alarm); + + slot->mdpResponsesReceived++; + if (slot->mdpResponsesReceived==32) { + // We have received all responses, so immediately ask for more + rhizome_fetch_mdp_requestblocks(slot); + } + + RETURN(0); } else { // TODO: Implement out-of-order reception so that lost packets @@ -1279,11 +1300,13 @@ int rhizome_received_content(unsigned char *bidprefix, RETURN(0); } else - DEBUGF("Doesn't match this slot, because BIDs don't match: %s* vs %s", - alloca_tohex(bidprefix,16), - alloca_tohex_bid(rhizome_fetch_queues[i].active.bid)); + if (0) + DEBUGF("Doesn't match this slot, because BIDs don't match: %s* vs %s", + alloca_tohex(bidprefix,16), + alloca_tohex_bid(rhizome_fetch_queues[i].active.bid)); } - } + } + RETURN(-1); } From 55df66f6d6e82ef221d3835bec008d764676e6f0 Mon Sep 17 00:00:00 2001 From: gardners Date: Sat, 1 Dec 2012 07:43:18 +1030 Subject: [PATCH 29/33] culled some debug output. mostly works, but fetch slots can get muddled. --- rhizome_fetch.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/rhizome_fetch.c b/rhizome_fetch.c index d0227ba4..d2df08c4 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -1014,7 +1014,8 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm) struct rhizome_fetch_slot *slot=(struct rhizome_fetch_slot*)alarm; long long now=gettime_ms(); if (now-slot->mdpLastRX>slot->mdpIdleTimeout) { - DEBUGF("MDP connection timed out"); + DEBUGF("MDP connection timed out: last RX %lldms ago", + now-slot->mdpLastRX); rhizome_fetch_close(slot); return; } @@ -1059,9 +1060,10 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) write_uint32(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8],slot->mdpRXBitmap); write_uint16(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8+4],slot->mdpRXBlockLength); - DEBUGF("src sid=%s, dst sid=%s, mdpRXWindowStart=0x%x", - alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid), - slot->mdpRXWindowStart); + if (0) + DEBUGF("src sid=%s, dst sid=%s, mdpRXWindowStart=0x%x", + alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid), + slot->mdpRXWindowStart); overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); From b7ae55c14300c1cadc092f459350807fca965d1c Mon Sep 17 00:00:00 2001 From: gardners Date: Sat, 1 Dec 2012 10:47:19 +1030 Subject: [PATCH 30/33] receiving multiple bundles via rhizome over mdp works now, although there is a bug with alarms being called on fetch slots after they complete, even though they apparently get unscheduled. --- rhizome_fetch.c | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/rhizome_fetch.c b/rhizome_fetch.c index d2df08c4..6217ab0f 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -979,7 +979,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) { // if (debug & DEBUG_RHIZOME_RX) - DEBUGF("close Rhizome fetch slot=%d", slotno(slot)); + DEBUGF("close Rhizome fetch slot=%d", slotno(slot)); assert(slot->state != RHIZOME_FETCH_FREE); /* close socket and stop watching it */ @@ -987,6 +987,7 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) unschedule(&slot->alarm); close(slot->alarm.poll.fd); slot->alarm.poll.fd = -1; + slot->alarm.function=NULL; /* Free ephemeral data */ if (slot->file) @@ -1012,6 +1013,13 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm) { struct rhizome_fetch_slot *slot=(struct rhizome_fetch_slot*)alarm; + + if (slot->state!=5) { + DEBUGF("Stale alarm triggered on idle/reclaimed slot. Ignoring"); + unschedule(alarm); + return; + } + long long now=gettime_ms(); if (now-slot->mdpLastRX>slot->mdpIdleTimeout) { DEBUGF("MDP connection timed out: last RX %lldms ago", @@ -1019,8 +1027,8 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm) rhizome_fetch_close(slot); return; } - DEBUGF("now-lastRX(0x%llx) <= idleTimeout(0x%llx)", - now-slot->mdpLastRX,slot->mdpIdleTimeout); + DEBUGF("Timeout waiting for blocks. Resending request for slot=0x%p", + slot); if (slot->bidP) rhizome_fetch_mdp_requestblocks(slot); else @@ -1120,7 +1128,7 @@ static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot) static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) { - DEBUGF("Trying to switch to MDP for Rhizome fetch"); + DEBUGF("Trying to switch to MDP for Rhizome fetch: slot=0x%p",slot); /* close socket and stop watching it */ unwatch(&slot->alarm); @@ -1136,6 +1144,8 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) 3. Set timeout for no traffic received. */ + slot->state=RHIZOME_FETCH_RXFILEMDP; + slot->mdpLastRX=gettime_ms(); if (slot->bidP) { /* We are requesting a file. The http request may have already received @@ -1245,7 +1255,7 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by } } } - DEBUGF("Closing rhizome fetch slot"); + DEBUGF("Closing rhizome fetch slot = 0x%p",slot); rhizome_fetch_close(slot); return; } @@ -1263,11 +1273,11 @@ int rhizome_received_content(unsigned char *bidprefix, IN(); int i; for(i=0;istate==RHIZOME_FETCH_RXFILEMDP&&slot->bidP) { + if (!bcmp(slot->bid,bidprefix,16)) { - struct rhizome_fetch_slot *slot=&rhizome_fetch_queues[i].active; + if (slot->file_ofs==offset) { debug=DEBUG_RHIZOME_RX; /* We don't know the file length until we receive the last @@ -1302,8 +1312,8 @@ int rhizome_received_content(unsigned char *bidprefix, RETURN(0); } else - if (0) - DEBUGF("Doesn't match this slot, because BIDs don't match: %s* vs %s", + if (1) + DEBUGF("Doesn't match this slot = 0x%p, because BIDs don't match: %s* vs %s", alloca_tohex(bidprefix,16), alloca_tohex_bid(rhizome_fetch_queues[i].active.bid)); } From 115736f0a494dac15a16234b435a9565b0c7f76e Mon Sep 17 00:00:00 2001 From: gardners Date: Sat, 1 Dec 2012 10:52:08 +1030 Subject: [PATCH 31/33] cleaned out excess debugging messages and upped block size from 200 bytes to 1KB to greatly improve throughput. Will eventually want to make block size selection be based on network interface. --- overlay_mdp_services.c | 30 +++++++++++++++--------------- rhizome_fetch.c | 2 +- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index c00db07f..f9a79d6e 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -31,21 +31,25 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) { IN(); - DEBUGF("Someone sent me a rhizome request via MDP"); - DEBUGF("requestor sid = %s",alloca_tohex_sid(mdp->out.src.sid)); - DEBUGF("bundle ID = %s",alloca_tohex_bid(&mdp->out.payload[0])); - DEBUGF("manifest version = 0x%llx", - read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES])); + uint64_t fileOffset= read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8]); - DEBUGF("file offset = 0x%llx",fileOffset); uint32_t bitmap= read_uint32(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8]); - DEBUGF("bitmap = 0x%08x",bitmap); uint16_t blockLength= read_uint16(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8+4]); - DEBUGF("block length = %d",blockLength); - if (blockLength>300) RETURN(-1); + if (blockLength>1024) RETURN(-1); + + if(0) { + DEBUGF("Someone sent me a rhizome request via MDP"); + DEBUGF("requestor sid = %s",alloca_tohex_sid(mdp->out.src.sid)); + DEBUGF("bundle ID = %s",alloca_tohex_bid(&mdp->out.payload[0])); + DEBUGF("manifest version = 0x%llx", + read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES])); + DEBUGF("file offset = 0x%llx",fileOffset); + DEBUGF("bitmap = 0x%08x",bitmap); + DEBUGF("block length = %d",blockLength); + } /* Find manifest that corresponds to BID and version. If we don't have this combination, then do nothing. @@ -63,8 +67,7 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) DEBUGF("Couldn't find stored file."); RETURN(-1); } - DEBUGF("manifest file row_id = %lld",row_id); - + sqlite3_blob *blob=NULL; int ret=sqlite3_blob_open(rhizome_db, "main", "files", "data", row_id, 0 /* read only */, &blob); @@ -73,8 +76,7 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) DEBUGF("Failed to open blob: %s",sqlite3_errmsg(rhizome_db)); RETURN(-1); } - int blob_bytes=sqlite3_blob_bytes(blob); - DEBUGF("blob_bytes=%d",blob_bytes); + int blob_bytes=sqlite3_blob_bytes(blob); if (blob_bytesblockLength) blockBytes=blockLength; - DEBUGF("blockBytes=%d, blockOffset=%d",blockBytes,blockOffset); // read data for block if (blob_bytes>=blockOffset) { sqlite3_blob_read(blob,&reply.out.payload[1+16+8+8], diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 6217ab0f..c89d17fa 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -1160,7 +1160,7 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) slot->mdpIdleTimeout=5000; // give up if nothing received for 5 seconds slot->mdpRXWindowStart=slot->file_ofs; slot->mdpRXBitmap=0x00000000; // no blocks received yet - slot->mdpRXBlockLength=200; + slot->mdpRXBlockLength=1024; // 200; rhizome_fetch_mdp_requestblocks(slot); } else { /* We are requesting a manifest, which is stateless, except that we eventually From 7b4b9dbf814405fa3b7bd215d81507d7017d61bc Mon Sep 17 00:00:00 2001 From: gardners Date: Sat, 1 Dec 2012 10:56:48 +1030 Subject: [PATCH 32/33] improved test coverage for Rhizome over MDP, and made testing of MDP vs HTTP more explicit for an existing test case. --- tests/rhizomeprotocol | 44 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/tests/rhizomeprotocol b/tests/rhizomeprotocol index 45787b87..b8b81db0 100755 --- a/tests/rhizomeprotocol +++ b/tests/rhizomeprotocol @@ -201,10 +201,20 @@ test_FileTransferBig() { assert_rhizome_received file1 } -doc_FileTransferMulti="New bundle transfers to four nodes" +doc_FileTransferMulti="New bundle transfers to four nodes via HTTP" setup_FileTransferMulti() { setup_common set_instance +A + executeOk_servald config set rhizome.mdp.enable 0 + set_instance +B + executeOk_servald config set rhizome.mdp.enable 0 + set_instance +C + executeOk_servald config set rhizome.mdp.enable 0 + set_instance +D + executeOk_servald config set rhizome.mdp.enable 0 + set_instance +E + executeOk_servald config set rhizome.mdp.enable 0 + set_instance +A rhizome_add_file file1 start_servald_instances +A +B +C +D +E foreach_instance +A assert_peers_are_instances +B +C +D +E @@ -222,6 +232,38 @@ test_FileTransferMulti() { done } +doc_FileTransferMultiMDP="New bundle transfers to four nodes via MDP" +setup_FileTransferMultiMDP() { + setup_common + set_instance +A + executeOk_servald config set rhizome.http.enable 0 + set_instance +B + executeOk_servald config set rhizome.http.enable 0 + set_instance +C + executeOk_servald config set rhizome.http.enable 0 + set_instance +D + executeOk_servald config set rhizome.http.enable 0 + set_instance +E + executeOk_servald config set rhizome.http.enable 0 + set_instance +A + rhizome_add_file file1 + start_servald_instances +A +B +C +D +E + foreach_instance +A assert_peers_are_instances +B +C +D +E + foreach_instance +B assert_peers_are_instances +A +C +D +E + foreach_instance +C assert_peers_are_instances +A +B +D +E + foreach_instance +D assert_peers_are_instances +A +B +C +E +} +test_FileTransferMultiMDP() { + wait_until bundle_received_by $BID:$VERSION +B +C +D +E + for i in B C D E; do + set_instance +$i + executeOk_servald rhizome list '' + assert_rhizome_list --fromhere=0 file1 + assert_rhizome_received file1 + done +} + + doc_FileTransferDelete="Payload deletion transfers to one node" setup_FileTransferDelete() { setup_common From 7ec695c9409ae4d8b00f225ee6be1cd4ed9a0b21 Mon Sep 17 00:00:00 2001 From: gardners Date: Mon, 3 Dec 2012 11:50:35 +1030 Subject: [PATCH 33/33] feedback from code review by Jeremy. Some preparation for out-of-order handling. Consolidated redundantly redundant fields in fetch slot structure. Probably fixed spurious alarm bug. --- overlay_mdp_services.c | 7 ++- rhizome_fetch.c | 97 ++++++++++++++++++------------------------ 2 files changed, 47 insertions(+), 57 deletions(-) diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index f9a79d6e..52ac6087 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -88,7 +88,12 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) // for low devices. The result is that an attacker can prevent rhizome transfers // if they want to by injecting fake blocks. The alternative is to not broadcast // back replies, and then we can authcrypt. - reply.packetTypeAndFlags=MDP_TX|MDP_NOSIGN|MDP_NOCRYPT; + // multiple receivers starting at different times, we really need merkle-tree hashing. + // so multiple receivers is not realistic for now. So use non-broadcast unicode + // for now would seem the safest. But that would stop us from allowing multiple + // receivers in the special case where additional nodes begin listening in from the + // beginning. + reply.packetTypeAndFlags=MDP_TX|MDP_NOCRYPT|MDP_NOSIGN; reply.out.ttl=1; bcopy(my_subscriber->sid,reply.out.src.sid,SID_SIZE); // send replies to broadcast so that others can hear blocks and record them diff --git a/rhizome_fetch.c b/rhizome_fetch.c index c89d17fa..83d0f813 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -61,6 +61,7 @@ struct rhizome_fetch_slot { char filename[1024]; int64_t file_len; int64_t file_ofs; + int64_t last_write_time; /* HTTP transport specific elements */ char request[1024]; @@ -73,11 +74,8 @@ struct rhizome_fetch_slot { int bidP; unsigned char prefix[RHIZOME_MANIFEST_ID_BYTES]; int prefix_length; - int64_t mdpNextTX; - int64_t mdpLastRX; int mdpIdleTimeout; - int mdpResponsesReceived; - int64_t mdpRXWindowStart; + int mdpResponsesOutstanding; int mdpRXBlockLength; uint32_t mdpRXBitmap; unsigned char mdpRXWindow[32*200]; @@ -983,8 +981,8 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) assert(slot->state != RHIZOME_FETCH_FREE); /* close socket and stop watching it */ - unwatch(&slot->alarm); unschedule(&slot->alarm); + unwatch(&slot->alarm); close(slot->alarm.poll.fd); slot->alarm.poll.fd = -1; slot->alarm.function=NULL; @@ -1021,14 +1019,15 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm) } long long now=gettime_ms(); - if (now-slot->mdpLastRX>slot->mdpIdleTimeout) { + if (now-slot->last_write_time>slot->mdpIdleTimeout) { DEBUGF("MDP connection timed out: last RX %lldms ago", - now-slot->mdpLastRX); + now-slot->last_write_time); rhizome_fetch_close(slot); return; } - DEBUGF("Timeout waiting for blocks. Resending request for slot=0x%p", - slot); + if (debug& DEBUG_RHIZOME_RX) + DEBUGF("Timeout waiting for blocks. Resending request for slot=0x%p", + slot); if (slot->bidP) rhizome_fetch_mdp_requestblocks(slot); else @@ -1037,17 +1036,11 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm) static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) { - if ((gettime_ms()-slot->mdpLastRX)>slot->mdpIdleTimeout) { - // connection timed out - DEBUGF("MDP connection timed out"); - return rhizome_fetch_close(slot); - } // only issue new requests every 133ms. // we automatically re-issue once we have received all packets in this // request also, so if there is no packet loss, we can go substantially // faster. Optimising behaviour when there is no packet loss is an // outstanding task. - slot->mdpNextTX=gettime_ms()+133; overlay_mdp_frame mdp; @@ -1064,24 +1057,25 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) bcopy(slot->bid,&mdp.out.payload[0],RHIZOME_MANIFEST_ID_BYTES); write_uint64(&mdp.out.payload[RHIZOME_BAR_BYTES],slot->bidVersion); - write_uint64(&mdp.out.payload[RHIZOME_BAR_BYTES+8],slot->mdpRXWindowStart); + write_uint64(&mdp.out.payload[RHIZOME_BAR_BYTES+8],slot->file_ofs); write_uint32(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8],slot->mdpRXBitmap); write_uint16(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8+4],slot->mdpRXBlockLength); if (0) DEBUGF("src sid=%s, dst sid=%s, mdpRXWindowStart=0x%x", alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid), - slot->mdpRXWindowStart); + slot->file_ofs); overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); // remember when we sent the request so that we can adjust the inter-request // interval based on how fast the packets arrive. - slot->mdpResponsesReceived=0; + slot->mdpResponsesOutstanding=32; // TODO: set according to bitmap unschedule(&slot->alarm); slot->alarm.function = rhizome_fetch_mdp_slot_callback; - slot->alarm.alarm=slot->mdpNextTX; + slot->alarm.alarm=gettime_ms()+133; + slot->alarm.deadline=slot->alarm.alarm+500; schedule(&slot->alarm); return 0; @@ -1095,12 +1089,11 @@ static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot) return rhizome_fetch_close(slot); } - if ((gettime_ms()-slot->mdpLastRX)>slot->mdpIdleTimeout) { + if ((gettime_ms()-slot->last_write_time)>slot->mdpIdleTimeout) { // connection timed out DEBUGF("MDP connection timedout"); return rhizome_fetch_close(slot); } - slot->mdpNextTX=gettime_ms()+100; overlay_mdp_frame mdp; @@ -1120,7 +1113,8 @@ static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot) DEBUGF("Set callback function, and set alarm"); slot->alarm.function = rhizome_fetch_mdp_slot_callback; - slot->alarm.alarm=slot->mdpNextTX; + slot->alarm.alarm=gettime_ms()+100; + slot->alarm.deadline=slot->alarm.alarm+500; schedule(&slot->alarm); return 0; @@ -1146,7 +1140,7 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) slot->state=RHIZOME_FETCH_RXFILEMDP; - slot->mdpLastRX=gettime_ms(); + slot->last_write_time=gettime_ms(); if (slot->bidP) { /* We are requesting a file. The http request may have already received some of the file, so take that into account when setting up ring buffer. @@ -1158,15 +1152,13 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) to fit into a packet, and probably fit at least one any any outgoing packet that is not otherwise full. */ slot->mdpIdleTimeout=5000; // give up if nothing received for 5 seconds - slot->mdpRXWindowStart=slot->file_ofs; slot->mdpRXBitmap=0x00000000; // no blocks received yet - slot->mdpRXBlockLength=1024; // 200; + slot->mdpRXBlockLength=200; // 200; rhizome_fetch_mdp_requestblocks(slot); } else { /* We are requesting a manifest, which is stateless, except that we eventually give up. All we need to do now is send the request, and set our alarm to try again in case we haven't heard anything back. */ - slot->mdpNextTX=gettime_ms()+100; slot->mdpIdleTimeout=2000; // only try for two seconds rhizome_fetch_mdp_requestmanifest(slot); } @@ -1202,7 +1194,7 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot) } } -void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes) +int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes) { if (bytes>(slot->file_len-slot->file_ofs)) bytes=slot->file_len-slot->file_ofs; @@ -1210,9 +1202,10 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by if (debug & DEBUG_RHIZOME_RX) DEBUGF("Failed to write %d bytes to file @ offset %d", bytes, slot->file_ofs); rhizome_fetch_close(slot); - return; + return -1; } slot->file_ofs+=bytes; + slot->last_write_time=gettime_ms(); if (slot->file_ofs>=slot->file_len) { /* got all of file */ if (debug & DEBUG_RHIZOME_RX) @@ -1257,13 +1250,16 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by } DEBUGF("Closing rhizome fetch slot = 0x%p",slot); rhizome_fetch_close(slot); - return; + return -1; } // reset inactivity timeout unschedule(&slot->alarm); slot->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT; slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; schedule(&slot->alarm); + + // slot is still open + return 0; } int rhizome_received_content(unsigned char *bidprefix, @@ -1277,36 +1273,31 @@ int rhizome_received_content(unsigned char *bidprefix, if (slot->state==RHIZOME_FETCH_RXFILEMDP&&slot->bidP) { if (!bcmp(slot->bid,bidprefix,16)) { - if (slot->file_ofs==offset) { - debug=DEBUG_RHIZOME_RX; /* We don't know the file length until we receive the last block. If it isn't the last block, lie, and claim the end of file is yet to come. */ if (type=='T') slot->file_len=offset+count; - else slot->file_len=offset+count+1; - rhizome_write_content(slot,(char *)bytes,count); - debug=0; - slot->mdpRXWindowStart=offset+count; - slot->mdpLastRX=gettime_ms(); - - // TODO: Shift bitmap - - unschedule(&slot->alarm); - slot->alarm.function = rhizome_fetch_mdp_slot_callback; - slot->alarm.alarm=slot->mdpNextTX; - schedule(&slot->alarm); - - slot->mdpResponsesReceived++; - if (slot->mdpResponsesReceived==32) { - // We have received all responses, so immediately ask for more - rhizome_fetch_mdp_requestblocks(slot); + else { + slot->file_len=offset+count+1; + } + if (!rhizome_write_content(slot,(char *)bytes,count)) + { + slot->mdpResponsesOutstanding--; + if (slot->mdpResponsesOutstanding==0) { + // We have received all responses, so immediately ask for more + rhizome_fetch_mdp_requestblocks(slot); + } + + // TODO: Try flushing out stuck packets that we have kept due to + // packet loss / out-of-order delivery. + } RETURN(0); } else { - // TODO: Implement out-of-order reception so that lost packets + // TODO: Implement out-of-order buffering so that lost packets // don't cause wastage } RETURN(0); @@ -1332,11 +1323,6 @@ void rhizome_fetch_poll(struct sched_ent *alarm) case RHIZOME_FETCH_SENDINGHTTPREQUEST: rhizome_fetch_write(slot); return; - case RHIZOME_FETCH_RXFILEMDP: - if (debug & DEBUG_RHIZOME_RX) - DEBUG("Fetching via MDP not implemented"); - rhizome_fetch_close(slot); - break; case RHIZOME_FETCH_RXFILE: { /* Keep reading until we have the promised amount of data */ char buffer[8192]; @@ -1410,8 +1396,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm) } break; default: - if (debug & DEBUG_RHIZOME_RX) - DEBUG("Closing rhizome fetch connection due to illegal/unimplemented state."); + WARNF("Closing rhizome fetch connection due to illegal/unimplemented state=%d.",slot->state); rhizome_fetch_close(slot); return; }