diff --git a/constants.h b/constants.h index 4c91c6c4..530d542c 100644 --- a/constants.h +++ b/constants.h @@ -123,10 +123,12 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #define MAX_SIGNATURES 16 #define MDP_PORT_KEYMAPREQUEST 1 -#define MDP_PORT_DNALOOKUP 2 #define MDP_PORT_ECHO 7 -#define MDP_PORT_VOMP 9 -#define MDP_PORT_DIRECTORY 10 +#define MDP_PORT_DNALOOKUP 10 +#define MDP_PORT_VOMP 12 +#define MDP_PORT_RHIZOME_REQUEST 13 +#define MDP_PORT_RHIZOME_RESPONSE 14 +#define MDP_PORT_DIRECTORY 15 #define MDP_PORT_NOREPLY 0x3f #define MDP_TYPE_MASK 0xff diff --git a/dataformats.c b/dataformats.c index a2c1133c..c45c0a7b 100644 --- a/dataformats.c +++ b/dataformats.c @@ -236,3 +236,48 @@ int str_is_uri(const char *uri) ++p; return p != q && *p == '\0'; } + +void write_uint64(unsigned char *o,uint64_t v) +{ + int i; + for(i=0;i<8;i++) + { *(o++)=v&0xff; v=v>>8; } +} + +void write_uint32(unsigned char *o,uint32_t v) +{ + int i; + for(i=0;i<4;i++) + { *(o++)=v&0xff; v=v>>8; } +} + +void write_uint16(unsigned char *o,uint16_t v) +{ + int i; + for(i=0;i<2;i++) + { *(o++)=v&0xff; v=v>>8; } +} + +uint64_t read_uint64(unsigned char *o) +{ + int i; + uint64_t v=0; + for(i=0;i<8;i++) v=(v<<8)|o[8-1-i]; + return v; +} + +uint32_t read_uint32(unsigned char *o) +{ + int i; + uint32_t v=0; + for(i=0;i<4;i++) v=(v<<8)|o[4-1-i]; + return v; +} + +uint16_t read_uint16(unsigned char *o) +{ + int i; + uint16_t v=0; + for(i=0;i<2;i++) v=(v<<8)|o[2-1-i]; + return v; +} diff --git a/overlay.c b/overlay.c index da810904..e0a61602 100644 --- a/overlay.c +++ b/overlay.c @@ -124,16 +124,16 @@ schedule(&_sched_##X); } /* Get rhizome server started BEFORE populating fd list so that the server's listen socket is in the list for poll() */ - if (rhizome_enabled()) { - if (!rhizome_opendb()){ - /* Rhizome http server needs to know which callback to attach + if (is_rhizome_enabled()) rhizome_opendb(); + + /* Rhizome http server needs to know which callback to attach to client sockets, so provide it here, along with the name to appear in time accounting statistics. */ - rhizome_http_server_start(rhizome_server_parse_http_request, - "rhizome_server_parse_http_request", - RHIZOME_HTTP_PORT,RHIZOME_HTTP_PORT_MAX); - } - } + if (is_rhizome_http_enabled()) + rhizome_http_server_start(rhizome_server_parse_http_request, + "rhizome_server_parse_http_request", + RHIZOME_HTTP_PORT,RHIZOME_HTTP_PORT_MAX); + // start the dna helper if configured dna_helper_start(); diff --git a/overlay_mdp.c b/overlay_mdp.c index 0821228b..601fd440 100644 --- a/overlay_mdp.c +++ b/overlay_mdp.c @@ -463,119 +463,7 @@ static int overlay_saw_mdp_frame(struct overlay_frame *frame, overlay_mdp_frame RETURN(WHY("Failed to pass received MDP frame to client")); } else { /* No socket is bound, ignore the packet ... except for magic sockets */ - switch(mdp->out.dst.port) { - case MDP_PORT_VOMP: - RETURN(vomp_mdp_received(mdp)); - case MDP_PORT_KEYMAPREQUEST: - /* Either respond with the appropriate SAS, or record this one if it - verifies out okay. */ - if (debug & DEBUG_MDPREQUESTS) - DEBUG("MDP_PORT_KEYMAPREQUEST"); - RETURN(keyring_mapping_request(keyring,mdp)); - case MDP_PORT_DNALOOKUP: /* attempt to resolve DID to SID */ - { - int cn=0,in=0,kp=0; - char did[64+1]; - int pll=mdp->out.payload_length; - if (pll>64) pll=64; - /* get did from the packet */ - if (mdp->out.payload_length<1) { - RETURN(WHY("Empty DID in DNA resolution request")); } - bcopy(&mdp->out.payload[0],&did[0],pll); - did[pll]=0; - - if (debug & DEBUG_MDPREQUESTS) - DEBUG("MDP_PORT_DNALOOKUP"); - - int results=0; - while(keyring_find_did(keyring,&cn,&in,&kp,did)) - { - /* package DID and Name into reply (we include the DID because - it could be a wild-card DID search, but the SID is implied - in the source address of our reply). */ - if (keyring->contexts[cn]->identities[in]->keypairs[kp]->private_key_len > DID_MAXSIZE) - /* skip excessively long DID records */ - continue; - const unsigned char *packedSid = keyring->contexts[cn]->identities[in]->keypairs[0]->public_key; - const char *unpackedDid = (const char *) keyring->contexts[cn]->identities[in]->keypairs[kp]->private_key; - const char *name = (const char *)keyring->contexts[cn]->identities[in]->keypairs[kp]->public_key; - // URI is sid://SIDHEX/DID - strbuf b = strbuf_alloca(SID_STRLEN + DID_MAXSIZE + 10); - strbuf_puts(b, "sid://"); - strbuf_tohex(b, packedSid, SID_SIZE); - strbuf_puts(b, "/local/"); - strbuf_puts(b, unpackedDid); - overlay_mdp_dnalookup_reply(&mdp->out.src, packedSid, strbuf_str(b), unpackedDid, name); - kp++; - results++; - } - if (!results) { - /* No local results, so see if servald has been configured to use - a DNA-helper that can provide additional mappings. This provides - a generalised interface for resolving telephone numbers into URIs. - The first use will be for resolving DIDs to SIP addresses for - OpenBTS boxes run by the OTI/Commotion project. - - The helper is run asynchronously, and the replies will be delivered - when results become available, so this function will return - immediately, so as not to cause blockages and delays in servald. - */ - dna_helper_enqueue(mdp, did, mdp->out.src.sid); - monitor_tell_formatted(MONITOR_DNAHELPER, "LOOKUP:%s:%d:%s\n", alloca_tohex_sid(mdp->out.src.sid), mdp->out.src.port, did); - } - RETURN(0); - } - break; - case MDP_PORT_ECHO: /* well known ECHO port for TCP/UDP and now MDP */ - { - /* Echo is easy: we swap the sender and receiver addresses (and thus port - numbers) and send the frame back. */ - - /* Swap addresses */ - overlay_mdp_swap_src_dst(mdp); - mdp->out.ttl=0; - - /* Prevent echo:echo connections and the resulting denial of service from triggering endless pongs. */ - if (mdp->out.dst.port==MDP_PORT_ECHO) { - RETURN(WHY("echo loop averted")); - } - /* If the packet was sent to broadcast, then replace broadcast address - with our local address. For now just responds with first local address */ - if (is_sid_broadcast(mdp->out.src.sid)) - { - if (my_subscriber) - bcopy(my_subscriber->sid, - mdp->out.src.sid,SID_SIZE); - else - /* No local addresses, so put all zeroes */ - bzero(mdp->out.src.sid,SID_SIZE); - } - - /* Always send PONGs auth-crypted so that the receipient knows - that they are genuine, and so that we avoid the extra cost - of signing (which is slower than auth-crypting) */ - int preserved=mdp->packetTypeAndFlags; - mdp->packetTypeAndFlags&=~(MDP_NOCRYPT|MDP_NOSIGN); - - /* queue frame for delivery */ - overlay_mdp_dispatch(mdp,0 /* system generated */, - NULL,0); - mdp->packetTypeAndFlags=preserved; - - /* and switch addresses back around in case the caller was planning on - using MDP structure again (this happens if there is a loop-back reply - and the frame needs sending on, as happens with broadcasts. MDP ping - is a simple application where this occurs). */ - overlay_mdp_swap_src_dst(mdp); - - } - break; - default: - /* Unbound socket. We won't be sending ICMP style connection refused - messages, partly because they are a waste of bandwidth. */ - RETURN(WHYF("Received packet for which no listening process exists (MDP ports: src=%d, dst=%d", - mdp->out.src.port,mdp->out.dst.port)); - } + RETURN(overlay_mdp_try_interal_services(mdp)); } break; default: @@ -635,6 +523,8 @@ int overlay_mdp_check_binding(struct subscriber *subscriber, int port, int userG case MDP_PORT_KEYMAPREQUEST: case MDP_PORT_VOMP: case MDP_PORT_DNALOOKUP: + case MDP_PORT_RHIZOME_RESPONSE: + case MDP_PORT_RHIZOME_REQUEST: return 0; } } diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c new file mode 100644 index 00000000..52ac6087 --- /dev/null +++ b/overlay_mdp_services.c @@ -0,0 +1,303 @@ +/* +Copyright (C) 2010-2012 Paul Gardner-Stephen, Serval Project. + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ + +#include +#include "serval.h" +#include "str.h" +#include "strbuf.h" +#include "overlay_buffer.h" +#include "overlay_address.h" +#include "overlay_packet.h" +#include "mdp_client.h" +#include "rhizome.h" +#include "crypto.h" +#include "log.h" + +int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) +{ + IN(); + + uint64_t fileOffset= + read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8]); + uint32_t bitmap= + read_uint32(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8]); + uint16_t blockLength= + read_uint16(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8+4]); + if (blockLength>1024) RETURN(-1); + + if(0) { + DEBUGF("Someone sent me a rhizome request via MDP"); + DEBUGF("requestor sid = %s",alloca_tohex_sid(mdp->out.src.sid)); + DEBUGF("bundle ID = %s",alloca_tohex_bid(&mdp->out.payload[0])); + DEBUGF("manifest version = 0x%llx", + read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES])); + DEBUGF("file offset = 0x%llx",fileOffset); + DEBUGF("bitmap = 0x%08x",bitmap); + DEBUGF("block length = %d",blockLength); + } + + /* Find manifest that corresponds to BID and version. + If we don't have this combination, then do nothing. + If we do have the combination, then find the associated file, + and open the blob so that we can send some of it. + + TODO: If we have a newer version of the manifest, and the manifest is a + journal, then the newer version is okay to use to service this request. + */ + long long row_id=-1; + if (sqlite_exec_int64(&row_id, "SELECT rowid FROM FILES WHERE id IN (SELECT filehash FROM MANIFESTS WHERE manifests.version=%lld AND manifests.id='%s');", + read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES]), + alloca_tohex_bid(&mdp->out.payload[0])) < 1) + { + DEBUGF("Couldn't find stored file."); + RETURN(-1); + } + + sqlite3_blob *blob=NULL; + int ret=sqlite3_blob_open(rhizome_db, "main", "files", "data", + row_id, 0 /* read only */, &blob); + if (ret!=SQLITE_OK) + { + DEBUGF("Failed to open blob: %s",sqlite3_errmsg(rhizome_db)); + RETURN(-1); + } + int blob_bytes=sqlite3_blob_bytes(blob); + if (blob_bytessid,reply.out.src.sid,SID_SIZE); + // send replies to broadcast so that others can hear blocks and record them + // (not that preemptive listening is implemented yet). + reply.out.src.port=MDP_PORT_RHIZOME_RESPONSE; + memset(reply.out.dst.sid,0xff,SID_SIZE); + reply.out.dst.port=MDP_PORT_RHIZOME_RESPONSE; + reply.out.queue=OQ_ORDINARY; + reply.out.payload[0]='B'; // reply contains blocks + // include 16 bytes of BID prefix for identification + bcopy(&mdp->out.payload[0],&reply.out.payload[1],16); + // and version of manifest + bcopy(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES], + &reply.out.payload[1+16],8); + + int i; + for(i=0;i<32;i++) + if (!(bitmap&(1<<(31-i)))) + { + // calculate and set offset of block + uint64_t blockOffset=fileOffset+i*blockLength; + write_uint64(&reply.out.payload[1+16+8],blockOffset); + // work out how many bytes to read + int blockBytes=blob_bytes-blockOffset; + if (blockBytes>blockLength) blockBytes=blockLength; + // read data for block + if (blob_bytes>=blockOffset) { + sqlite3_blob_read(blob,&reply.out.payload[1+16+8+8], + blockBytes,blockOffset); + reply.out.payload_length=1+16+8+8+blockBytes; + + // Mark terminal block if required + if (blockOffset+blockBytes==blob_bytes) reply.out.payload[0]='T'; + // send packet + overlay_mdp_dispatch(&reply,0 /* system generated */, NULL,0); + } else break; + } + + sqlite3_blob_close(blob); blob=NULL; + + RETURN(-1); +} + +int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp) +{ + IN(); + + if (!mdp->out.payload_length) RETURN(-1); + + int type=mdp->out.payload[0]; + switch (type) { + case 'B': /* data block */ + case 'T': /* terminal data block */ + { + if (mdp->out.payload_length<(1+16+8+8+1)) RETURN(-1); + unsigned char *bidprefix=&mdp->out.payload[1]; + uint64_t version=read_uint64(&mdp->out.payload[1+16]); + uint64_t offset=read_uint64(&mdp->out.payload[1+16+8]); + int count=mdp->out.payload_length-(1+16+8+8); + unsigned char *bytes=&mdp->out.payload[1+16+8+8]; + if (0) + DEBUGF("Received %d bytes @ 0x%llx for %s* version 0x%llx", + count,offset,alloca_tohex(bidprefix,16),version); + + /* Now see if there is a slot that matches. If so, then + see if the bytes are in the window, and write them. + + If there is not matching slot, then consider setting + a slot to capture this files as it is being requested + by someone else. + */ + rhizome_received_content(bidprefix,version,offset,count,bytes,type); + + RETURN(-1); + } + break; + } + + + RETURN(-1); +} + +int overlay_mdp_service_dnalookup(overlay_mdp_frame *mdp) +{ + IN(); + int cn=0,in=0,kp=0; + char did[64+1]; + int pll=mdp->out.payload_length; + if (pll>64) pll=64; + /* get did from the packet */ + if (mdp->out.payload_length<1) { + RETURN(WHY("Empty DID in DNA resolution request")); } + bcopy(&mdp->out.payload[0],&did[0],pll); + did[pll]=0; + + if (debug & DEBUG_MDPREQUESTS) + DEBUG("MDP_PORT_DNALOOKUP"); + + int results=0; + while(keyring_find_did(keyring,&cn,&in,&kp,did)) + { + /* package DID and Name into reply (we include the DID because + it could be a wild-card DID search, but the SID is implied + in the source address of our reply). */ + if (keyring->contexts[cn]->identities[in]->keypairs[kp]->private_key_len > DID_MAXSIZE) + /* skip excessively long DID records */ + continue; + const unsigned char *packedSid = keyring->contexts[cn]->identities[in]->keypairs[0]->public_key; + const char *unpackedDid = (const char *) keyring->contexts[cn]->identities[in]->keypairs[kp]->private_key; + const char *name = (const char *)keyring->contexts[cn]->identities[in]->keypairs[kp]->public_key; + // URI is sid://SIDHEX/DID + strbuf b = strbuf_alloca(SID_STRLEN + DID_MAXSIZE + 10); + strbuf_puts(b, "sid://"); + strbuf_tohex(b, packedSid, SID_SIZE); + strbuf_puts(b, "/local/"); + strbuf_puts(b, unpackedDid); + overlay_mdp_dnalookup_reply(&mdp->out.src, packedSid, strbuf_str(b), unpackedDid, name); + kp++; + results++; + } + if (!results) { + /* No local results, so see if servald has been configured to use + a DNA-helper that can provide additional mappings. This provides + a generalised interface for resolving telephone numbers into URIs. + The first use will be for resolving DIDs to SIP addresses for + OpenBTS boxes run by the OTI/Commotion project. + + The helper is run asynchronously, and the replies will be delivered + when results become available, so this function will return + immediately, so as not to cause blockages and delays in servald. + */ + dna_helper_enqueue(mdp, did, mdp->out.src.sid); + monitor_tell_formatted(MONITOR_DNAHELPER, "LOOKUP:%s:%d:%s\n", + alloca_tohex_sid(mdp->out.src.sid), mdp->out.src.port, + did); + } + RETURN(0); +} + +int overlay_mdp_service_echo(overlay_mdp_frame *mdp) +{ + /* Echo is easy: we swap the sender and receiver addresses (and thus port + numbers) and send the frame back. */ + IN(); + + /* Swap addresses */ + overlay_mdp_swap_src_dst(mdp); + mdp->out.ttl=0; + + /* Prevent echo:echo connections and the resulting denial of service from triggering endless pongs. */ + if (mdp->out.dst.port==MDP_PORT_ECHO) { + RETURN(WHY("echo loop averted")); + } + /* If the packet was sent to broadcast, then replace broadcast address + with our local address. For now just responds with first local address */ + if (is_sid_broadcast(mdp->out.src.sid)) + { + if (my_subscriber) + bcopy(my_subscriber->sid, + mdp->out.src.sid,SID_SIZE); + else + /* No local addresses, so put all zeroes */ + bzero(mdp->out.src.sid,SID_SIZE); + } + + /* Always send PONGs auth-crypted so that the receipient knows + that they are genuine, and so that we avoid the extra cost + of signing (which is slower than auth-crypting) */ + int preserved=mdp->packetTypeAndFlags; + mdp->packetTypeAndFlags&=~(MDP_NOCRYPT|MDP_NOSIGN); + + /* queue frame for delivery */ + overlay_mdp_dispatch(mdp,0 /* system generated */, + NULL,0); + mdp->packetTypeAndFlags=preserved; + + /* and switch addresses back around in case the caller was planning on + using MDP structure again (this happens if there is a loop-back reply + and the frame needs sending on, as happens with broadcasts. MDP ping + is a simple application where this occurs). */ + overlay_mdp_swap_src_dst(mdp); + RETURN(0); +} + +int overlay_mdp_try_interal_services(overlay_mdp_frame *mdp) +{ + IN(); + switch(mdp->out.dst.port) { + case MDP_PORT_VOMP: RETURN(vomp_mdp_received(mdp)); + case MDP_PORT_KEYMAPREQUEST: RETURN(keyring_mapping_request(keyring,mdp)); + case MDP_PORT_DNALOOKUP: RETURN(overlay_mdp_service_dnalookup(mdp)); + case MDP_PORT_ECHO: RETURN(overlay_mdp_service_echo(mdp)); + case MDP_PORT_RHIZOME_REQUEST: + if (is_rhizome_mdp_server_running()) { + RETURN(overlay_mdp_service_rhizomerequest(mdp)); + } else break; + case MDP_PORT_RHIZOME_RESPONSE: + if (is_rhizome_mdp_server_running()) { + RETURN(overlay_mdp_service_rhizomeresponse(mdp)); + } else break; + } + + /* Unbound socket. We won't be sending ICMP style connection refused + messages, partly because they are a waste of bandwidth. */ + RETURN(WHYF("Received packet for which no listening process exists (MDP ports: src=%d, dst=%d", + mdp->out.src.port,mdp->out.dst.port)); +} diff --git a/packetformats.c b/packetformats.c new file mode 100644 index 00000000..587bfffd --- /dev/null +++ b/packetformats.c @@ -0,0 +1,86 @@ +/* +Serval Distributed Numbering Architecture (DNA) +Copyright (C) 2010 Paul Gardner-Stephen + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ + +#include "serval.h" + +int packetOk(struct overlay_interface *interface, unsigned char *packet, size_t len, + unsigned char *transaction_id,int ttl, + struct sockaddr *recvaddr, size_t recvaddrlen,int parseP) +{ + if (len>8; } +} + +void write_uint32(unsigned char *o,uint32_t v) +{ + int i; + for(i=0;i<4;i++) + { *(o++)=v&0xff; v=v>>8; } +} + +void write_uint16(unsigned char *o,uint16_t v) +{ + int i; + for(i=0;i<2;i++) + { *(o++)=v&0xff; v=v>>8; } +} + +uint64_t read_uint64(unsigned char *o) +{ + int i; + uint64_t v=0; + for(i=0;i<8;i++) v=(v<<8)|o[8-1-i]; + return v; +} + +uint32_t read_uint32(unsigned char *o) +{ + int i; + uint32_t v=0; + for(i=0;i<4;i++) v=(v<<8)|o[4-1-i]; + return v; +} + +uint16_t read_uint16(unsigned char *o) +{ + int i; + uint16_t v=0; + for(i=0;i<2;i++) v=(v<<8)|o[2-1-i]; + return v; +} diff --git a/rhizome.c b/rhizome.c index 04cf7300..dfbc7085 100644 --- a/rhizome.c +++ b/rhizome.c @@ -22,9 +22,37 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "rhizome.h" #include -int rhizome_enabled() +int is_rhizome_enabled() { - return confValueGetBoolean("rhizome.enable", 1);; + return confValueGetBoolean("rhizome.enable", 1); +} + +int is_rhizome_http_enabled() +{ + return confValueGetBoolean("rhizome.enable", 1) + && confValueGetBoolean("rhizome.http.enable", 1) + && rhizome_db; +} + +int is_rhizome_mdp_enabled() +{ + return confValueGetBoolean("rhizome.enable", 1) + && confValueGetBoolean("rhizome.mdp.enable", 1) + && rhizome_db; +} + +int is_rhizome_mdp_server_running() +{ + return is_rhizome_mdp_enabled(); +} + +int is_rhizome_advertise_enabled() +{ + return confValueGetBoolean("rhizome.enable", 1) + && confValueGetBoolean("rhizome.advertise.enable", 1) + && rhizome_db + && ( is_rhizome_http_server_running() + ||is_rhizome_mdp_server_running()); } int rhizome_fetch_delay_ms() diff --git a/rhizome.h b/rhizome.h index ab421cbf..42e0a178 100644 --- a/rhizome.h +++ b/rhizome.h @@ -326,8 +326,8 @@ int rhizome_sign_hash_with_key(rhizome_manifest *m,const unsigned char *sk, int rhizome_verify_bundle_privatekey(rhizome_manifest *m, const unsigned char *sk, const unsigned char *pk); int rhizome_find_bundle_author(rhizome_manifest *m); -int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, int timeout); -int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip); +int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, const unsigned char peersid[SID_SIZE], int timeout); +int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]); /* one manifest is required per candidate, plus a few spare. so MAX_RHIZOME_MANIFESTS must be > MAX_CANDIDATES. @@ -335,7 +335,7 @@ int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in #define MAX_RHIZOME_MANIFESTS 24 #define MAX_CANDIDATES 16 -int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip); +int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]); typedef struct rhizome_http_request { struct sched_ent alarm; @@ -428,6 +428,10 @@ struct http_response { unsigned long long content_length; const char * body; }; + +int rhizome_received_content(unsigned char *bidprefix,uint64_t version, + uint64_t offset,int count,unsigned char *bytes, + int type); int rhizome_server_set_response(rhizome_http_request *r, const struct http_response *h); int rhizome_server_free_http_request(rhizome_http_request *r); int rhizome_server_http_send_bytes(rhizome_http_request *r); @@ -439,6 +443,13 @@ int rhizome_http_server_start(int (*http_parse_func)(rhizome_http_request *), const char *http_parse_func_description, int port_low,int port_high); +int is_rhizome_enabled(); +int is_rhizome_mdp_enabled(); +int is_rhizome_http_enabled(); +int is_rhizome_advertise_enabled(); +int is_rhizome_mdp_server_running(); +int is_rhizome_http_server_running(); + typedef struct rhizome_direct_bundle_cursor { /* Where the current fill started */ long long start_size_high; @@ -552,7 +563,7 @@ enum rhizome_start_fetch_result { SLOTBUSY }; -enum rhizome_start_fetch_result rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length); +enum rhizome_start_fetch_result rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char sid[SID_SIZE],const unsigned char *prefix, size_t prefix_length); int rhizome_any_fetch_active(); int rhizome_any_fetch_queued(); diff --git a/rhizome_direct_http.c b/rhizome_direct_http.c index 7fc8f836..76c06ebd 100644 --- a/rhizome_direct_http.c +++ b/rhizome_direct_http.c @@ -731,6 +731,8 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r) DEBUGF("Dispatch size_high=%lld",r->cursor->size_high); rhizome_direct_transport_state_http *state = r->transport_specific_state; + unsigned char zerosid[SID_SIZE]="\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"; + int sock=socket(AF_INET, SOCK_STREAM, 0); if (sock==-1) { WHY_perror("socket"); @@ -876,7 +878,7 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r) Then as noted above, we can use that to pull the file down using existing routines. */ - if (!rhizome_fetch_request_manifest_by_prefix(&addr,&actionlist[i+1],RHIZOME_BAR_PREFIX_BYTES)) + if (!rhizome_fetch_request_manifest_by_prefix(&addr,zerosid,&actionlist[i+1],RHIZOME_BAR_PREFIX_BYTES)) { /* Fetching the manifest, and then using it to see if we want to fetch the file for import is all handled asynchronously, so just diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 873e56d0..83d0f813 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -24,12 +24,19 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "rhizome.h" #include "str.h" #include "strbuf_helpers.h" +#include "overlay_address.h" /* Represents a queued fetch of a bundle payload, for which the manifest is already known. */ struct rhizome_fetch_candidate { rhizome_manifest *manifest; - struct sockaddr_in peer; + + /* Address of node offering manifest. + Can be either IP+port for HTTP or it can be a SID + for MDP. */ + struct sockaddr_in peer_ipandport; + unsigned char peer_sid[SID_SIZE]; + int priority; }; @@ -39,22 +46,45 @@ struct rhizome_fetch_candidate { struct rhizome_fetch_slot { struct sched_ent alarm; // must be first element in struct rhizome_manifest *manifest; - struct sockaddr_in peer; + + struct sockaddr_in peer_ipandport; + unsigned char peer_sid[SID_SIZE]; + int state; #define RHIZOME_FETCH_FREE 0 #define RHIZOME_FETCH_CONNECTING 1 #define RHIZOME_FETCH_SENDINGHTTPREQUEST 2 #define RHIZOME_FETCH_RXHTTPHEADERS 3 #define RHIZOME_FETCH_RXFILE 4 +#define RHIZOME_FETCH_RXFILEMDP 5 FILE *file; char filename[1024]; + int64_t file_len; + int64_t file_ofs; + int64_t last_write_time; + + /* HTTP transport specific elements */ char request[1024]; int request_len; int request_ofs; - int64_t file_len; - int64_t file_ofs; + + /* MDP transport specific elements */ + unsigned char bid[RHIZOME_MANIFEST_ID_BYTES]; + int64_t bidVersion; + int bidP; + unsigned char prefix[RHIZOME_MANIFEST_ID_BYTES]; + int prefix_length; + int mdpIdleTimeout; + int mdpResponsesOutstanding; + int mdpRXBlockLength; + uint32_t mdpRXBitmap; + unsigned char mdpRXWindow[32*200]; }; +static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot); +static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot); +static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot); + /* Represents a queue of fetch candidates and a single active fetch for bundle payloads whose size * is less than a given threshold. * @@ -380,7 +410,8 @@ int rhizome_manifest_version_cache_lookup(rhizome_manifest *m) typedef struct ignored_manifest { unsigned char bid[crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES]; - struct sockaddr_in peer; + struct sockaddr_in peer_ipandport; + unsigned char peer_sid[SID_SIZE]; time_ms_t timeout; } ignored_manifest; @@ -400,7 +431,7 @@ typedef struct ignored_manifest_cache { a collision is exceedingly remote */ ignored_manifest_cache ignored; -int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip) +int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char *peersid) { int bin = m->cryptoSignPublic[0]>>(8-IGNORED_BIN_BITS); int slot; @@ -419,7 +450,7 @@ int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in return 0; } -int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, int timeout) +int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, const unsigned char peersid[SID_SIZE], int timeout) { /* The supplied manifest from a given IP has errors, so remember that it isn't worth considering */ @@ -439,8 +470,11 @@ int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in /* ignore for a while */ ignored.bins[bin].m[slot].timeout=gettime_ms()+timeout; bcopy(peerip, - &ignored.bins[bin].m[slot].peer, + &ignored.bins[bin].m[slot].peer_ipandport, sizeof(struct sockaddr_in)); + bcopy(peersid, + ignored.bins[bin].m[slot].peer_sid, + SID_SIZE); return 0; } @@ -468,31 +502,35 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot) WHYF_perror("fopen(`%s`, \"w\")", slot->filename); goto bail; } - if (slot->peer.sin_family == AF_INET) { + if (slot->peer_ipandport.sin_family == AF_INET) { /* Transfer via HTTP over IPv4 */ if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) { WHY_perror("socket"); - goto bail; + goto bail_http; } if (set_nonblock(sock) == -1) - goto bail; + goto bail_http; char buf[INET_ADDRSTRLEN]; - if (inet_ntop(AF_INET, &slot->peer.sin_addr, buf, sizeof buf) == NULL) { + if (inet_ntop(AF_INET, &slot->peer_ipandport.sin_addr, buf, sizeof buf) == NULL) { buf[0] = '*'; buf[1] = '\0'; } - if (connect(sock, (struct sockaddr*)&slot->peer, sizeof slot->peer) == -1) { + if (connect(sock, (struct sockaddr*)&slot->peer_ipandport, + sizeof slot->peer_ipandport) == -1) { if (errno == EINPROGRESS) { if (debug & DEBUG_RHIZOME_RX) DEBUGF("connect() returned EINPROGRESS"); } else { - WHYF_perror("connect(%d, %s:%u)", sock, buf, ntohs(slot->peer.sin_port)); - goto bail; + WHYF_perror("connect(%d, %s:%u)", sock, buf, + ntohs(slot->peer_ipandport.sin_port)); + goto bail_http; } } if (debug & DEBUG_RHIZOME_RX) - DEBUGF("RHIZOME HTTP REQUEST family=%u addr=%s port=%u %s", - slot->peer.sin_family, buf, ntohs(slot->peer.sin_port), alloca_str_toprint(slot->request) + DEBUGF("RHIZOME HTTP REQUEST family=%u addr=%s sid=%s port=%u %s", + slot->peer_ipandport.sin_family, + alloca_tohex_sid(slot->peer_sid), + buf, ntohs(slot->peer_ipandport.sin_port), alloca_str_toprint(slot->request) ); slot->alarm.poll.fd = sock; slot->request_ofs = 0; @@ -511,10 +549,16 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot) slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; schedule(&slot->alarm); return 0; - } else { - /* TODO: Fetch via overlay */ - WHY("Rhizome fetching via overlay not implemented"); } + + bail_http: + /* Fetch via overlay, either because no IP address was provided, or because + the connection/attempt to fetch via HTTP failed. */ + WHY("Rhizome fetching via overlay not implemented"); + slot->state=RHIZOME_FETCH_RXFILEMDP; + rhizome_fetch_switch_to_mdp(slot); + return 0; + bail: if (sock != -1) close(sock); @@ -565,7 +609,7 @@ bail: * @author Andrew Bettison */ static enum rhizome_start_fetch_result -rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct sockaddr_in *peerip) +rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct sockaddr_in *peerip,unsigned const char *peersid) { if (slot->state != RHIZOME_FETCH_FREE) return SLOTBUSY; @@ -667,12 +711,23 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct // Start the fetch. //dump("peerip", peerip, sizeof *peerip); - slot->peer = *peerip; + + /* Prepare for fetching via HTTP */ + slot->peer_ipandport = *peerip; strbuf r = strbuf_local(slot->request, sizeof slot->request); strbuf_sprintf(r, "GET /rhizome/file/%s HTTP/1.0\r\n\r\n", m->fileHexHash); if (strbuf_overrun(r)) return WHY("request overrun"); slot->request_len = strbuf_len(r); + + /* Prepare for fetching via MDP */ + bcopy(peersid,slot->peer_sid,SID_SIZE); + bcopy(m->cryptoSignPublic,slot->bid,RHIZOME_MANIFEST_ID_BYTES); + slot->bidVersion=m->version; + DEBUGF("request bid=%s, version=0x%llx", + alloca_tohex_bid(slot->bid),slot->bidVersion); + slot->bidP=1; + if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "payload.%s", bid)) return -1; m->dataFileName = strdup(slot->filename); @@ -692,19 +747,30 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct * Returns -1 on error. */ enum rhizome_start_fetch_result -rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length) +rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, + const unsigned char peersid[SID_SIZE], + const unsigned char *prefix, size_t prefix_length) { assert(peerip); struct rhizome_fetch_slot *slot = rhizome_find_fetch_slot(MAX_MANIFEST_BYTES); if (slot == NULL) return SLOTBUSY; - slot->peer = *peerip; + + /* Prepare for fetching via HTTP */ + slot->peer_ipandport = *peerip; slot->manifest = NULL; strbuf r = strbuf_local(slot->request, sizeof slot->request); strbuf_sprintf(r, "GET /rhizome/manifestbyprefix/%s HTTP/1.0\r\n\r\n", alloca_tohex(prefix, prefix_length)); if (strbuf_overrun(r)) return WHY("request overrun"); slot->request_len = strbuf_len(r); + + /* Prepare for fetching via MDP */ + bcopy(peersid,slot->peer_sid,SID_SIZE); + bcopy(prefix,slot->prefix,prefix_length); + slot->prefix_length=prefix_length; + slot->bidP=0; + if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "manifest.%s", alloca_tohex(prefix, prefix_length))) return -1; if (schedule_fetch(slot) == -1) { @@ -726,7 +792,7 @@ static void rhizome_start_next_queued_fetch(struct rhizome_fetch_slot *slot) int i = 0; struct rhizome_fetch_candidate *c; while (i < q->candidate_queue_size && (c = &q->candidate_queue[i])->manifest) { - int result = rhizome_fetch(slot, c->manifest, &c->peer); + int result = rhizome_fetch(slot, c->manifest, &c->peer_ipandport,c->peer_sid); switch (result) { case SLOTBUSY: return; @@ -781,7 +847,7 @@ static void rhizome_start_next_queued_fetches(struct sched_ent *alarm) * * @author Andrew Bettison */ -int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip) +int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]) { IN(); const char *bid = alloca_tohex_bid(m->cryptoSignPublic); @@ -807,7 +873,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock if (rhizome_manifest_verify(m) != 0) { WHY("Error verifying manifest when considering for import"); /* Don't waste time looking at this manifest again for a while */ - rhizome_queue_ignore_manifest(m, peerip, 60000); + rhizome_queue_ignore_manifest(m, peerip, peersid, 60000); rhizome_manifest_free(m); RETURN(-1); } @@ -841,7 +907,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock if (!m->selfSigned && rhizome_manifest_verify(m)) { WHY("Error verifying manifest when considering queuing for import"); /* Don't waste time looking at this manifest again for a while */ - rhizome_queue_ignore_manifest(m, peerip, 60000); + rhizome_queue_ignore_manifest(m, peerip, peersid, 60000); rhizome_manifest_free(m); RETURN(-1); } @@ -867,15 +933,16 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock if (!m->selfSigned && rhizome_manifest_verify(m)) { WHY("Error verifying manifest when considering queuing for import"); /* Don't waste time looking at this manifest again for a while */ - rhizome_queue_ignore_manifest(m, peerip, 60000); + rhizome_queue_ignore_manifest(m, peerip, peersid, 60000); rhizome_manifest_free(m); RETURN(-1); } struct rhizome_fetch_candidate *c = rhizome_fetch_insert(qi, ci); c->manifest = m; - c->peer = *peerip; c->priority = priority; + c->peer_ipandport = *peerip; + bcopy(peersid,c->peer_sid,SID_SIZE); if (debug & DEBUG_RHIZOME_RX) { DEBUG("Rhizome fetch queues:"); @@ -909,15 +976,16 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) { - if (debug & DEBUG_RHIZOME_RX) - DEBUGF("close Rhizome fetch slot=%d", slotno(slot)); + // if (debug & DEBUG_RHIZOME_RX) + DEBUGF("close Rhizome fetch slot=%d", slotno(slot)); assert(slot->state != RHIZOME_FETCH_FREE); /* close socket and stop watching it */ - unwatch(&slot->alarm); unschedule(&slot->alarm); + unwatch(&slot->alarm); close(slot->alarm.poll.fd); slot->alarm.poll.fd = -1; + slot->alarm.function=NULL; /* Free ephemeral data */ if (slot->file) @@ -940,14 +1008,173 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) return 0; } +static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm) +{ + struct rhizome_fetch_slot *slot=(struct rhizome_fetch_slot*)alarm; + + if (slot->state!=5) { + DEBUGF("Stale alarm triggered on idle/reclaimed slot. Ignoring"); + unschedule(alarm); + return; + } + + long long now=gettime_ms(); + if (now-slot->last_write_time>slot->mdpIdleTimeout) { + DEBUGF("MDP connection timed out: last RX %lldms ago", + now-slot->last_write_time); + rhizome_fetch_close(slot); + return; + } + if (debug& DEBUG_RHIZOME_RX) + DEBUGF("Timeout waiting for blocks. Resending request for slot=0x%p", + slot); + if (slot->bidP) + rhizome_fetch_mdp_requestblocks(slot); + else + rhizome_fetch_mdp_requestmanifest(slot); +} + +static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) +{ + // only issue new requests every 133ms. + // we automatically re-issue once we have received all packets in this + // request also, so if there is no packet loss, we can go substantially + // faster. Optimising behaviour when there is no packet loss is an + // outstanding task. + + overlay_mdp_frame mdp; + + bzero(&mdp,sizeof(mdp)); + bcopy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE); + mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE; + bcopy(slot->peer_sid,mdp.out.dst.sid,SID_SIZE); + mdp.out.dst.port=MDP_PORT_RHIZOME_REQUEST; + mdp.out.ttl=1; + mdp.packetTypeAndFlags=MDP_TX; + + mdp.out.queue=OQ_ORDINARY; + mdp.out.payload_length=RHIZOME_BAR_BYTES+8+8+4+2; + bcopy(slot->bid,&mdp.out.payload[0],RHIZOME_MANIFEST_ID_BYTES); + + write_uint64(&mdp.out.payload[RHIZOME_BAR_BYTES],slot->bidVersion); + write_uint64(&mdp.out.payload[RHIZOME_BAR_BYTES+8],slot->file_ofs); + write_uint32(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8],slot->mdpRXBitmap); + write_uint16(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8+4],slot->mdpRXBlockLength); + + if (0) + DEBUGF("src sid=%s, dst sid=%s, mdpRXWindowStart=0x%x", + alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid), + slot->file_ofs); + + overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); + + // remember when we sent the request so that we can adjust the inter-request + // interval based on how fast the packets arrive. + slot->mdpResponsesOutstanding=32; // TODO: set according to bitmap + + unschedule(&slot->alarm); + slot->alarm.function = rhizome_fetch_mdp_slot_callback; + slot->alarm.alarm=gettime_ms()+133; + slot->alarm.deadline=slot->alarm.alarm+500; + schedule(&slot->alarm); + + return 0; +} + +static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot) +{ + if (slot->prefix_length<1||slot->prefix_length>32) { + // invalid request + DEBUGF("invalid MDP Rhizome request"); + return rhizome_fetch_close(slot); + } + + if ((gettime_ms()-slot->last_write_time)>slot->mdpIdleTimeout) { + // connection timed out + DEBUGF("MDP connection timedout"); + return rhizome_fetch_close(slot); + } + + overlay_mdp_frame mdp; + + bzero(&mdp,sizeof(mdp)); + bcopy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE); + mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE; + bcopy(slot->peer_sid,mdp.out.dst.sid,SID_SIZE); + mdp.out.dst.port=MDP_PORT_RHIZOME_REQUEST; + mdp.out.ttl=1; + mdp.packetTypeAndFlags=MDP_TX; + + mdp.out.queue=OQ_ORDINARY; + mdp.out.payload_length=slot->prefix_length; + bcopy(slot->prefix,&mdp.out.payload[0],slot->prefix_length); + + overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); + + DEBUGF("Set callback function, and set alarm"); + slot->alarm.function = rhizome_fetch_mdp_slot_callback; + slot->alarm.alarm=gettime_ms()+100; + slot->alarm.deadline=slot->alarm.alarm+500; + schedule(&slot->alarm); + + return 0; +} + +static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) +{ + DEBUGF("Trying to switch to MDP for Rhizome fetch: slot=0x%p",slot); + + /* close socket and stop watching it */ + unwatch(&slot->alarm); + unschedule(&slot->alarm); + if (slot->alarm.poll.fd!=-1) { + close(slot->alarm.poll.fd); + slot->alarm.poll.fd = -1; + } + + /* Begin MDP fetch process. + 1. Send initial request. + 2. Set timeout for next request (if fetching a file). + 3. Set timeout for no traffic received. + */ + + slot->state=RHIZOME_FETCH_RXFILEMDP; + + slot->last_write_time=gettime_ms(); + if (slot->bidP) { + /* We are requesting a file. The http request may have already received + some of the file, so take that into account when setting up ring buffer. + Then send the request for the next block of data, and set our alarm to + re-ask in a little while. "In a little while" is 133ms, which is roughly + the time it takes to send 16KB via WiFi broadcast at the 1Mbit base rate + (this will need tuning for non-WiFi interfaces). 16KB ~= 32 x 200 bytes + which is the block size we will use. 200bytes allows for several blocks + to fit into a packet, and probably fit at least one any any outgoing packet + that is not otherwise full. */ + slot->mdpIdleTimeout=5000; // give up if nothing received for 5 seconds + slot->mdpRXBitmap=0x00000000; // no blocks received yet + slot->mdpRXBlockLength=200; // 200; + rhizome_fetch_mdp_requestblocks(slot); + } else { + /* We are requesting a manifest, which is stateless, except that we eventually + give up. All we need to do now is send the request, and set our alarm to + try again in case we haven't heard anything back. */ + slot->mdpIdleTimeout=2000; // only try for two seconds + rhizome_fetch_mdp_requestmanifest(slot); + } + + return 0; +} + void rhizome_fetch_write(struct rhizome_fetch_slot *slot) { if (debug & DEBUG_RHIZOME_RX) DEBUGF("write_nonblock(%d, %s)", slot->alarm.poll.fd, alloca_toprint(-1, &slot->request[slot->request_ofs], slot->request_len-slot->request_ofs)); int bytes = write_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_ofs], slot->request_len-slot->request_ofs); if (bytes == -1) { - WHY("Got error while sending HTTP request. Closing."); - rhizome_fetch_close(slot); + WHY("Got error while sending HTTP request."); + rhizome_fetch_switch_to_mdp(slot); + return; } else { // reset timeout unschedule(&slot->alarm); @@ -967,7 +1194,7 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot) } } -void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes) +int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes) { if (bytes>(slot->file_len-slot->file_ofs)) bytes=slot->file_len-slot->file_ofs; @@ -975,9 +1202,10 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by if (debug & DEBUG_RHIZOME_RX) DEBUGF("Failed to write %d bytes to file @ offset %d", bytes, slot->file_ofs); rhizome_fetch_close(slot); - return; + return -1; } slot->file_ofs+=bytes; + slot->last_write_time=gettime_ms(); if (slot->file_ofs>=slot->file_len) { /* got all of file */ if (debug & DEBUG_RHIZOME_RX) @@ -987,13 +1215,19 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by if (slot->manifest) { // Were fetching payload, now we have it. if (!rhizome_import_received_bundle(slot->manifest)){ - char buf[INET_ADDRSTRLEN]; - if (inet_ntop(AF_INET, &slot->peer.sin_addr, buf, sizeof buf) == NULL) { - buf[0] = '*'; - buf[1] = '\0'; + if (slot->state==RHIZOME_FETCH_RXFILE) { + char buf[INET_ADDRSTRLEN]; + if (inet_ntop(AF_INET, &slot->peer_ipandport.sin_addr, buf, sizeof buf) == NULL) { + buf[0] = '*'; + buf[1] = '\0'; + } + INFOF("Completed http request from %s:%u for file %s", + buf, ntohs(slot->peer_ipandport.sin_port), + slot->manifest->fileHexHash); + } else { + INFOF("Completed MDP request from %s for file %s", + alloca_tohex_sid(slot->peer_sid), slot->manifest->fileHexHash); } - INFOF("Completed http request from %s:%u for file %s", - buf, ntohs(slot->peer.sin_port), slot->manifest->fileHexHash); } } else { /* This was to fetch the manifest, so now fetch the file if needed */ @@ -1007,19 +1241,76 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by rhizome_manifest_free(m); } else { DEBUGF("All looks good for importing manifest id=%s", alloca_tohex_bid(m->cryptoSignPublic)); - dump("slot->peer",&slot->peer,sizeof(slot->peer)); - rhizome_suggest_queue_manifest_import(m, &slot->peer); + dump("slot->peerip",&slot->peer_ipandport,sizeof(slot->peer_ipandport)); + dump("slot->peersid",&slot->peer_sid,sizeof(slot->peer_sid)); + rhizome_suggest_queue_manifest_import(m, &slot->peer_ipandport, + slot->peer_sid); } } } + DEBUGF("Closing rhizome fetch slot = 0x%p",slot); rhizome_fetch_close(slot); - return; + return -1; } // reset inactivity timeout unschedule(&slot->alarm); slot->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT; slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; schedule(&slot->alarm); + + // slot is still open + return 0; +} + +int rhizome_received_content(unsigned char *bidprefix, + uint64_t version, uint64_t offset, + int count,unsigned char *bytes,int type) +{ + IN(); + int i; + for(i=0;istate==RHIZOME_FETCH_RXFILEMDP&&slot->bidP) { + if (!bcmp(slot->bid,bidprefix,16)) + { + if (slot->file_ofs==offset) { + /* We don't know the file length until we receive the last + block. If it isn't the last block, lie, and claim the end of + file is yet to come. */ + if (type=='T') slot->file_len=offset+count; + else { + slot->file_len=offset+count+1; + + } + + if (!rhizome_write_content(slot,(char *)bytes,count)) + { + slot->mdpResponsesOutstanding--; + if (slot->mdpResponsesOutstanding==0) { + // We have received all responses, so immediately ask for more + rhizome_fetch_mdp_requestblocks(slot); + } + + // TODO: Try flushing out stuck packets that we have kept due to + // packet loss / out-of-order delivery. + } + + RETURN(0); + } else { + // TODO: Implement out-of-order buffering so that lost packets + // don't cause wastage + } + RETURN(0); + } + else + if (1) + DEBUGF("Doesn't match this slot = 0x%p, because BIDs don't match: %s* vs %s", + alloca_tohex(bidprefix,16), + alloca_tohex_bid(rhizome_fetch_queues[i].active.bid)); + } + } + + RETURN(-1); } void rhizome_fetch_poll(struct sched_ent *alarm) @@ -1028,88 +1319,87 @@ void rhizome_fetch_poll(struct sched_ent *alarm) if (alarm->poll.revents & (POLLIN | POLLOUT)) { switch (slot->state) { - case RHIZOME_FETCH_CONNECTING: - case RHIZOME_FETCH_SENDINGHTTPREQUEST: - rhizome_fetch_write(slot); + case RHIZOME_FETCH_CONNECTING: + case RHIZOME_FETCH_SENDINGHTTPREQUEST: + rhizome_fetch_write(slot); + return; + case RHIZOME_FETCH_RXFILE: { + /* Keep reading until we have the promised amount of data */ + char buffer[8192]; + sigPipeFlag = 0; + int bytes = read_nonblock(slot->alarm.poll.fd, buffer, sizeof buffer); + /* If we got some data, see if we have found the end of the HTTP request */ + if (bytes > 0) { + rhizome_write_content(slot, buffer, bytes); return; - case RHIZOME_FETCH_RXFILE: { - /* Keep reading until we have the promised amount of data */ - char buffer[8192]; - sigPipeFlag = 0; - int bytes = read_nonblock(slot->alarm.poll.fd, buffer, sizeof buffer); - /* If we got some data, see if we have found the end of the HTTP request */ - if (bytes > 0) { - rhizome_write_content(slot, buffer, bytes); - return; - } else { - if (debug & DEBUG_RHIZOME_RX) - DEBUG("Empty read, closing connection"); - rhizome_fetch_close(slot); - return; - } - if (sigPipeFlag) { - if (debug & DEBUG_RHIZOME_RX) - DEBUG("Received SIGPIPE, closing connection"); - rhizome_fetch_close(slot); - return; - } - } - break; - case RHIZOME_FETCH_RXHTTPHEADERS: { - /* Keep reading until we have two CR/LFs in a row */ - sigPipeFlag = 0; - int bytes = read_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_len], 1024 - slot->request_len - 1); - /* If we got some data, see if we have found the end of the HTTP reply */ - if (bytes > 0) { - // reset timeout - unschedule(&slot->alarm); - slot->alarm.alarm = gettime_ms() + RHIZOME_IDLE_TIMEOUT; - slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; - schedule(&slot->alarm); - slot->request_len += bytes; - if (http_header_complete(slot->request, slot->request_len, bytes)) { - if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Got HTTP reply: %s", alloca_toprint(160, slot->request, slot->request_len)); - /* We have all the reply headers, so parse them, taking care of any following bytes of - content. */ - struct http_response_parts parts; - if (unpack_http_response(slot->request, &parts) == -1) { - if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Failed HTTP request: failed to unpack http response"); - rhizome_fetch_close(slot); - return; - } - if (parts.code != 200) { - if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Failed HTTP request: rhizome server returned %d != 200 OK", parts.code); - rhizome_fetch_close(slot); - return; - } - if (parts.content_length == -1) { - if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Invalid HTTP reply: missing Content-Length header"); - rhizome_fetch_close(slot); - return; - } - slot->file_len = parts.content_length; - /* We have all we need. The file is already open, so just write out any initial bytes of - the body we read. - */ - slot->state = RHIZOME_FETCH_RXFILE; - int content_bytes = slot->request + slot->request_len - parts.content_start; - if (content_bytes > 0){ - rhizome_write_content(slot, parts.content_start, content_bytes); - return; - } - } - } - break; - default: + } else { + if (debug & DEBUG_RHIZOME_RX) + DEBUG("Empty read, closing connection"); + rhizome_fetch_switch_to_mdp(slot); + return; + } + if (sigPipeFlag) { + if (debug & DEBUG_RHIZOME_RX) + DEBUG("Received SIGPIPE, closing connection"); + rhizome_fetch_switch_to_mdp(slot); + return; + } + } + break; + case RHIZOME_FETCH_RXHTTPHEADERS: { + /* Keep reading until we have two CR/LFs in a row */ + sigPipeFlag = 0; + int bytes = read_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_len], 1024 - slot->request_len - 1); + /* If we got some data, see if we have found the end of the HTTP reply */ + if (bytes > 0) { + // reset timeout + unschedule(&slot->alarm); + slot->alarm.alarm = gettime_ms() + RHIZOME_IDLE_TIMEOUT; + slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; + schedule(&slot->alarm); + slot->request_len += bytes; + if (http_header_complete(slot->request, slot->request_len, bytes)) { if (debug & DEBUG_RHIZOME_RX) - DEBUG("Closing rhizome fetch connection due to illegal/unimplemented state."); - rhizome_fetch_close(slot); - return; + DEBUGF("Got HTTP reply: %s", alloca_toprint(160, slot->request, slot->request_len)); + /* We have all the reply headers, so parse them, taking care of any following bytes of + content. */ + struct http_response_parts parts; + if (unpack_http_response(slot->request, &parts) == -1) { + if (debug & DEBUG_RHIZOME_RX) + DEBUGF("Failed HTTP request: failed to unpack http response"); + rhizome_fetch_switch_to_mdp(slot); + return; + } + if (parts.code != 200) { + if (debug & DEBUG_RHIZOME_RX) + DEBUGF("Failed HTTP request: rhizome server returned %d != 200 OK", parts.code); + rhizome_fetch_switch_to_mdp(slot); + return; + } + if (parts.content_length == -1) { + if (debug & DEBUG_RHIZOME_RX) + DEBUGF("Invalid HTTP reply: missing Content-Length header"); + rhizome_fetch_switch_to_mdp(slot); + return; + } + slot->file_len = parts.content_length; + /* We have all we need. The file is already open, so just write out any initial bytes of + the body we read. + */ + slot->state = RHIZOME_FETCH_RXFILE; + int content_bytes = slot->request + slot->request_len - parts.content_start; + if (content_bytes > 0){ + rhizome_write_content(slot, parts.content_start, content_bytes); + return; + } } + } + break; + default: + WARNF("Closing rhizome fetch connection due to illegal/unimplemented state=%d.",slot->state); + rhizome_fetch_close(slot); + return; + } } } if (alarm->poll.revents==0 || alarm->poll.revents & (POLLHUP | POLLERR)){ diff --git a/rhizome_http.c b/rhizome_http.c index 72c92b82..23c8451a 100644 --- a/rhizome_http.c +++ b/rhizome_http.c @@ -72,7 +72,7 @@ unsigned char favicon_bytes[]={ ,0,0,0,0,0,0,0,0,0,0,0,0,0,0}; int favicon_len=318; -int rhizome_http_server_running() +int is_rhizome_http_server_running() { return rhizome_server_socket != -1; } diff --git a/rhizome_packetformats.c b/rhizome_packetformats.c index af014e7d..48204f11 100644 --- a/rhizome_packetformats.c +++ b/rhizome_packetformats.c @@ -145,7 +145,7 @@ int overlay_rhizome_add_advertisements(struct decode_context *context, int inter XXX We will move all processing of Rhizome into a separate process so that the CPU delays caused by Rhizome verifying signatures isn't a problem. */ - if (!rhizome_http_server_running() || !rhizome_db) + if (!is_rhizome_advertise_enabled()) RETURN(0); int pass; @@ -409,7 +409,7 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long RETURN(0); } - if (rhizome_ignore_manifest_check(m, &httpaddr)) + if (rhizome_ignore_manifest_check(m, &httpaddr,f->source->sid)) { /* Ignoring manifest that has caused us problems recently */ if (1) WARNF("Ignoring manifest with errors: %s*", manifest_id_prefix); @@ -424,7 +424,7 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long } else { if (debug & DEBUG_RHIZOME_ADS) DEBUG("Not seen before."); - rhizome_suggest_queue_manifest_import(m, &httpaddr); + rhizome_suggest_queue_manifest_import(m, &httpaddr,f->source->sid); // the above function will free the manifest structure, make sure we don't free it again m=NULL; } @@ -435,7 +435,7 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long DEBUG("Unverified manifest has errors - so not processing any further."); /* Don't waste any time on this manifest in future attempts for at least a minute. */ - rhizome_queue_ignore_manifest(m, &httpaddr, 60000); + rhizome_queue_ignore_manifest(m, &httpaddr,f->source->sid, 60000); } if (m) { rhizome_manifest_free(m); diff --git a/serval.h b/serval.h index 6f75bd84..f38bb985 100644 --- a/serval.h +++ b/serval.h @@ -738,6 +738,7 @@ void overlay_dummy_poll(struct sched_ent *alarm); void overlay_route_tick(struct sched_ent *alarm); void server_shutdown_check(struct sched_ent *alarm); void overlay_mdp_poll(struct sched_ent *alarm); +int overlay_mdp_try_interal_services(overlay_mdp_frame *mdp); void fd_periodicstats(struct sched_ent *alarm); void rhizome_check_connections(struct sched_ent *alarm); @@ -770,4 +771,11 @@ void dump_stack(); int olsr_init_socket(void); int olsr_send(struct overlay_frame *frame); +void write_uint64(unsigned char *o,uint64_t v); +void write_uint16(unsigned char *o,uint16_t v); +void write_uint32(unsigned char *o,uint32_t v); +uint64_t read_uint64(unsigned char *o); +uint32_t read_uint32(unsigned char *o); +uint16_t read_uint16(unsigned char *o); + #endif // __SERVALD_SERVALD_H diff --git a/sourcefiles.mk b/sourcefiles.mk index 0c6e5ef3..b1c9fb41 100644 --- a/sourcefiles.mk +++ b/sourcefiles.mk @@ -28,6 +28,7 @@ SERVAL_SOURCES = $(SERVAL_BASE)audiodevices.c \ $(SERVAL_BASE)overlay_interface.c \ $(SERVAL_BASE)overlay_queue.c \ $(SERVAL_BASE)overlay_mdp.c \ + $(SERVAL_BASE)overlay_mdp_services.c \ $(SERVAL_BASE)overlay_olsr.c \ $(SERVAL_BASE)overlay_packetformats.c \ $(SERVAL_BASE)overlay_payload.c \ diff --git a/tests/rhizomeprotocol b/tests/rhizomeprotocol index 9314d935..b8b81db0 100755 --- a/tests/rhizomeprotocol +++ b/tests/rhizomeprotocol @@ -89,10 +89,102 @@ test_FileTransfer() { assert_rhizome_received file2 } -doc_FileTransferBig="Big new bundle transfers to one node" -setup_FileTransferBig() { +doc_DisablingHTTPServer="Disabling HTTP rhizome transports works" +setup_DisablingHTTPServer() { setup_common set_instance +A + rhizome_add_file file1 + executeOk_servald config set rhizome.http.enable 0 + start_servald_instances +A +} +test_DisablingHTTPServer() { + !rhizome_http_server_started +} + +doc_HTTPTransport="Rhizome over HTTP transport" +setup_HTTPTransport() { + setup_common + set_instance +B + executeOk_servald config set rhizome.mdp.enable 0 + set_instance +A + executeOk_servald config set rhizome.mdp.enable 0 + rhizome_add_file file1 + start_servald_instances +A +B + foreach_instance +A assert_peers_are_instances +B + foreach_instance +B assert_peers_are_instances +A +} +test_HTTPTransport() { + wait_until bundle_received_by $BID:$VERSION +B + set_instance +B + executeOk_servald rhizome list '' + assert_rhizome_list --fromhere=0 file1 + assert_rhizome_received file1 + set_instance +A + rhizome_update_file file1 file2 + set_instance +B + wait_until bundle_received_by $BID:$VERSION +B + executeOk_servald rhizome list '' + assert_rhizome_list --fromhere=0 file2 + assert_rhizome_received file2 +} + +doc_MDPTransport="Rhizome over MDP transport" +setup_MDPTransport() { + setup_common + set_instance +B + executeOk_servald config set rhizome.http.enable 0 + set_instance +A + executeOk_servald config set rhizome.http.enable 0 + rhizome_add_file file1 + start_servald_instances +A +B + foreach_instance +A assert_peers_are_instances +B + foreach_instance +B assert_peers_are_instances +A +} +test_MDPTransport() { + wait_until bundle_received_by $BID:$VERSION +B + set_instance +B + executeOk_servald rhizome list '' + assert_rhizome_list --fromhere=0 file1 + assert_rhizome_received file1 + set_instance +A + rhizome_update_file file1 file2 + set_instance +B + wait_until bundle_received_by $BID:$VERSION +B + executeOk_servald rhizome list '' + assert_rhizome_list --fromhere=0 file2 + assert_rhizome_received file2 +} + +doc_FileTransferBigMDP="Big new bundle transfers to one node via MDP" +setup_FileTransferBigMDP() { + setup_common + set_instance +B + executeOk_servald config set rhizome.http.enable 0 + set_instance +A + executeOk_servald config set rhizome.http.enable 0 + dd if=/dev/urandom of=file1 bs=1k count=1k 2>&1 + echo x >>file1 + ls -l file1 + rhizome_add_file file1 + start_servald_instances +A +B + foreach_instance +A assert_peers_are_instances +B + foreach_instance +B assert_peers_are_instances +A +} +test_FileTransferBigMDP() { + wait_until bundle_received_by $BID:$VERSION +B + set_instance +B + executeOk_servald rhizome list '' + assert_rhizome_list --fromhere=0 file1 + assert_rhizome_received file1 +} + +doc_FileTransferBig="Big new bundle transfers to one node via HTTP" +setup_FileTransferBig() { + setup_common + set_instance +B + executeOk_servald config set rhizome.mdp.enable 0 + set_instance +A + executeOk_servald config set rhizome.mdp.enable 0 dd if=/dev/urandom of=file1 bs=1k count=1k 2>&1 echo x >>file1 ls -l file1 @@ -109,10 +201,20 @@ test_FileTransferBig() { assert_rhizome_received file1 } -doc_FileTransferMulti="New bundle transfers to four nodes" +doc_FileTransferMulti="New bundle transfers to four nodes via HTTP" setup_FileTransferMulti() { setup_common set_instance +A + executeOk_servald config set rhizome.mdp.enable 0 + set_instance +B + executeOk_servald config set rhizome.mdp.enable 0 + set_instance +C + executeOk_servald config set rhizome.mdp.enable 0 + set_instance +D + executeOk_servald config set rhizome.mdp.enable 0 + set_instance +E + executeOk_servald config set rhizome.mdp.enable 0 + set_instance +A rhizome_add_file file1 start_servald_instances +A +B +C +D +E foreach_instance +A assert_peers_are_instances +B +C +D +E @@ -130,6 +232,38 @@ test_FileTransferMulti() { done } +doc_FileTransferMultiMDP="New bundle transfers to four nodes via MDP" +setup_FileTransferMultiMDP() { + setup_common + set_instance +A + executeOk_servald config set rhizome.http.enable 0 + set_instance +B + executeOk_servald config set rhizome.http.enable 0 + set_instance +C + executeOk_servald config set rhizome.http.enable 0 + set_instance +D + executeOk_servald config set rhizome.http.enable 0 + set_instance +E + executeOk_servald config set rhizome.http.enable 0 + set_instance +A + rhizome_add_file file1 + start_servald_instances +A +B +C +D +E + foreach_instance +A assert_peers_are_instances +B +C +D +E + foreach_instance +B assert_peers_are_instances +A +C +D +E + foreach_instance +C assert_peers_are_instances +A +B +D +E + foreach_instance +D assert_peers_are_instances +A +B +C +E +} +test_FileTransferMultiMDP() { + wait_until bundle_received_by $BID:$VERSION +B +C +D +E + for i in B C D E; do + set_instance +$i + executeOk_servald rhizome list '' + assert_rhizome_list --fromhere=0 file1 + assert_rhizome_received file1 + done +} + + doc_FileTransferDelete="Payload deletion transfers to one node" setup_FileTransferDelete() { setup_common