Merge remote-tracking branch 'origin/master' into packet_format

Conflicts:
	constants.h
	packetformats.c
This commit is contained in:
Jeremy Lakeman 2012-12-03 14:10:32 +10:30
commit 2b21a691ca
15 changed files with 1064 additions and 264 deletions

View File

@ -123,10 +123,12 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define MAX_SIGNATURES 16
#define MDP_PORT_KEYMAPREQUEST 1
#define MDP_PORT_DNALOOKUP 2
#define MDP_PORT_ECHO 7
#define MDP_PORT_VOMP 9
#define MDP_PORT_DIRECTORY 10
#define MDP_PORT_DNALOOKUP 10
#define MDP_PORT_VOMP 12
#define MDP_PORT_RHIZOME_REQUEST 13
#define MDP_PORT_RHIZOME_RESPONSE 14
#define MDP_PORT_DIRECTORY 15
#define MDP_PORT_NOREPLY 0x3f
#define MDP_TYPE_MASK 0xff

View File

@ -236,3 +236,48 @@ int str_is_uri(const char *uri)
++p;
return p != q && *p == '\0';
}
void write_uint64(unsigned char *o,uint64_t v)
{
int i;
for(i=0;i<8;i++)
{ *(o++)=v&0xff; v=v>>8; }
}
void write_uint32(unsigned char *o,uint32_t v)
{
int i;
for(i=0;i<4;i++)
{ *(o++)=v&0xff; v=v>>8; }
}
void write_uint16(unsigned char *o,uint16_t v)
{
int i;
for(i=0;i<2;i++)
{ *(o++)=v&0xff; v=v>>8; }
}
uint64_t read_uint64(unsigned char *o)
{
int i;
uint64_t v=0;
for(i=0;i<8;i++) v=(v<<8)|o[8-1-i];
return v;
}
uint32_t read_uint32(unsigned char *o)
{
int i;
uint32_t v=0;
for(i=0;i<4;i++) v=(v<<8)|o[4-1-i];
return v;
}
uint16_t read_uint16(unsigned char *o)
{
int i;
uint16_t v=0;
for(i=0;i<2;i++) v=(v<<8)|o[2-1-i];
return v;
}

View File

@ -124,16 +124,16 @@ schedule(&_sched_##X); }
/* Get rhizome server started BEFORE populating fd list so that
the server's listen socket is in the list for poll() */
if (rhizome_enabled()) {
if (!rhizome_opendb()){
/* Rhizome http server needs to know which callback to attach
if (is_rhizome_enabled()) rhizome_opendb();
/* Rhizome http server needs to know which callback to attach
to client sockets, so provide it here, along with the name to
appear in time accounting statistics. */
rhizome_http_server_start(rhizome_server_parse_http_request,
"rhizome_server_parse_http_request",
RHIZOME_HTTP_PORT,RHIZOME_HTTP_PORT_MAX);
}
}
if (is_rhizome_http_enabled())
rhizome_http_server_start(rhizome_server_parse_http_request,
"rhizome_server_parse_http_request",
RHIZOME_HTTP_PORT,RHIZOME_HTTP_PORT_MAX);
// start the dna helper if configured
dna_helper_start();

View File

@ -463,119 +463,7 @@ static int overlay_saw_mdp_frame(struct overlay_frame *frame, overlay_mdp_frame
RETURN(WHY("Failed to pass received MDP frame to client"));
} else {
/* No socket is bound, ignore the packet ... except for magic sockets */
switch(mdp->out.dst.port) {
case MDP_PORT_VOMP:
RETURN(vomp_mdp_received(mdp));
case MDP_PORT_KEYMAPREQUEST:
/* Either respond with the appropriate SAS, or record this one if it
verifies out okay. */
if (debug & DEBUG_MDPREQUESTS)
DEBUG("MDP_PORT_KEYMAPREQUEST");
RETURN(keyring_mapping_request(keyring,mdp));
case MDP_PORT_DNALOOKUP: /* attempt to resolve DID to SID */
{
int cn=0,in=0,kp=0;
char did[64+1];
int pll=mdp->out.payload_length;
if (pll>64) pll=64;
/* get did from the packet */
if (mdp->out.payload_length<1) {
RETURN(WHY("Empty DID in DNA resolution request")); }
bcopy(&mdp->out.payload[0],&did[0],pll);
did[pll]=0;
if (debug & DEBUG_MDPREQUESTS)
DEBUG("MDP_PORT_DNALOOKUP");
int results=0;
while(keyring_find_did(keyring,&cn,&in,&kp,did))
{
/* package DID and Name into reply (we include the DID because
it could be a wild-card DID search, but the SID is implied
in the source address of our reply). */
if (keyring->contexts[cn]->identities[in]->keypairs[kp]->private_key_len > DID_MAXSIZE)
/* skip excessively long DID records */
continue;
const unsigned char *packedSid = keyring->contexts[cn]->identities[in]->keypairs[0]->public_key;
const char *unpackedDid = (const char *) keyring->contexts[cn]->identities[in]->keypairs[kp]->private_key;
const char *name = (const char *)keyring->contexts[cn]->identities[in]->keypairs[kp]->public_key;
// URI is sid://SIDHEX/DID
strbuf b = strbuf_alloca(SID_STRLEN + DID_MAXSIZE + 10);
strbuf_puts(b, "sid://");
strbuf_tohex(b, packedSid, SID_SIZE);
strbuf_puts(b, "/local/");
strbuf_puts(b, unpackedDid);
overlay_mdp_dnalookup_reply(&mdp->out.src, packedSid, strbuf_str(b), unpackedDid, name);
kp++;
results++;
}
if (!results) {
/* No local results, so see if servald has been configured to use
a DNA-helper that can provide additional mappings. This provides
a generalised interface for resolving telephone numbers into URIs.
The first use will be for resolving DIDs to SIP addresses for
OpenBTS boxes run by the OTI/Commotion project.
The helper is run asynchronously, and the replies will be delivered
when results become available, so this function will return
immediately, so as not to cause blockages and delays in servald.
*/
dna_helper_enqueue(mdp, did, mdp->out.src.sid);
monitor_tell_formatted(MONITOR_DNAHELPER, "LOOKUP:%s:%d:%s\n", alloca_tohex_sid(mdp->out.src.sid), mdp->out.src.port, did);
}
RETURN(0);
}
break;
case MDP_PORT_ECHO: /* well known ECHO port for TCP/UDP and now MDP */
{
/* Echo is easy: we swap the sender and receiver addresses (and thus port
numbers) and send the frame back. */
/* Swap addresses */
overlay_mdp_swap_src_dst(mdp);
mdp->out.ttl=0;
/* Prevent echo:echo connections and the resulting denial of service from triggering endless pongs. */
if (mdp->out.dst.port==MDP_PORT_ECHO) {
RETURN(WHY("echo loop averted"));
}
/* If the packet was sent to broadcast, then replace broadcast address
with our local address. For now just responds with first local address */
if (is_sid_broadcast(mdp->out.src.sid))
{
if (my_subscriber)
bcopy(my_subscriber->sid,
mdp->out.src.sid,SID_SIZE);
else
/* No local addresses, so put all zeroes */
bzero(mdp->out.src.sid,SID_SIZE);
}
/* Always send PONGs auth-crypted so that the receipient knows
that they are genuine, and so that we avoid the extra cost
of signing (which is slower than auth-crypting) */
int preserved=mdp->packetTypeAndFlags;
mdp->packetTypeAndFlags&=~(MDP_NOCRYPT|MDP_NOSIGN);
/* queue frame for delivery */
overlay_mdp_dispatch(mdp,0 /* system generated */,
NULL,0);
mdp->packetTypeAndFlags=preserved;
/* and switch addresses back around in case the caller was planning on
using MDP structure again (this happens if there is a loop-back reply
and the frame needs sending on, as happens with broadcasts. MDP ping
is a simple application where this occurs). */
overlay_mdp_swap_src_dst(mdp);
}
break;
default:
/* Unbound socket. We won't be sending ICMP style connection refused
messages, partly because they are a waste of bandwidth. */
RETURN(WHYF("Received packet for which no listening process exists (MDP ports: src=%d, dst=%d",
mdp->out.src.port,mdp->out.dst.port));
}
RETURN(overlay_mdp_try_interal_services(mdp));
}
break;
default:
@ -635,6 +523,8 @@ int overlay_mdp_check_binding(struct subscriber *subscriber, int port, int userG
case MDP_PORT_KEYMAPREQUEST:
case MDP_PORT_VOMP:
case MDP_PORT_DNALOOKUP:
case MDP_PORT_RHIZOME_RESPONSE:
case MDP_PORT_RHIZOME_REQUEST:
return 0;
}
}

303
overlay_mdp_services.c Normal file
View File

@ -0,0 +1,303 @@
/*
Copyright (C) 2010-2012 Paul Gardner-Stephen, Serval Project.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include <sys/stat.h>
#include "serval.h"
#include "str.h"
#include "strbuf.h"
#include "overlay_buffer.h"
#include "overlay_address.h"
#include "overlay_packet.h"
#include "mdp_client.h"
#include "rhizome.h"
#include "crypto.h"
#include "log.h"
int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp)
{
IN();
uint64_t fileOffset=
read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8]);
uint32_t bitmap=
read_uint32(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8]);
uint16_t blockLength=
read_uint16(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8+4]);
if (blockLength>1024) RETURN(-1);
if(0) {
DEBUGF("Someone sent me a rhizome request via MDP");
DEBUGF("requestor sid = %s",alloca_tohex_sid(mdp->out.src.sid));
DEBUGF("bundle ID = %s",alloca_tohex_bid(&mdp->out.payload[0]));
DEBUGF("manifest version = 0x%llx",
read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES]));
DEBUGF("file offset = 0x%llx",fileOffset);
DEBUGF("bitmap = 0x%08x",bitmap);
DEBUGF("block length = %d",blockLength);
}
/* Find manifest that corresponds to BID and version.
If we don't have this combination, then do nothing.
If we do have the combination, then find the associated file,
and open the blob so that we can send some of it.
TODO: If we have a newer version of the manifest, and the manifest is a
journal, then the newer version is okay to use to service this request.
*/
long long row_id=-1;
if (sqlite_exec_int64(&row_id, "SELECT rowid FROM FILES WHERE id IN (SELECT filehash FROM MANIFESTS WHERE manifests.version=%lld AND manifests.id='%s');",
read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES]),
alloca_tohex_bid(&mdp->out.payload[0])) < 1)
{
DEBUGF("Couldn't find stored file.");
RETURN(-1);
}
sqlite3_blob *blob=NULL;
int ret=sqlite3_blob_open(rhizome_db, "main", "files", "data",
row_id, 0 /* read only */, &blob);
if (ret!=SQLITE_OK)
{
DEBUGF("Failed to open blob: %s",sqlite3_errmsg(rhizome_db));
RETURN(-1);
}
int blob_bytes=sqlite3_blob_bytes(blob);
if (blob_bytes<fileOffset) {
sqlite3_blob_close(blob); blob=NULL;
RETURN(-1);
}
overlay_mdp_frame reply;
bzero(&reply,sizeof(reply));
// Reply is broadcast, so we cannot authcrypt, and signing is too time consuming
// for low devices. The result is that an attacker can prevent rhizome transfers
// if they want to by injecting fake blocks. The alternative is to not broadcast
// back replies, and then we can authcrypt.
// multiple receivers starting at different times, we really need merkle-tree hashing.
// so multiple receivers is not realistic for now. So use non-broadcast unicode
// for now would seem the safest. But that would stop us from allowing multiple
// receivers in the special case where additional nodes begin listening in from the
// beginning.
reply.packetTypeAndFlags=MDP_TX|MDP_NOCRYPT|MDP_NOSIGN;
reply.out.ttl=1;
bcopy(my_subscriber->sid,reply.out.src.sid,SID_SIZE);
// send replies to broadcast so that others can hear blocks and record them
// (not that preemptive listening is implemented yet).
reply.out.src.port=MDP_PORT_RHIZOME_RESPONSE;
memset(reply.out.dst.sid,0xff,SID_SIZE);
reply.out.dst.port=MDP_PORT_RHIZOME_RESPONSE;
reply.out.queue=OQ_ORDINARY;
reply.out.payload[0]='B'; // reply contains blocks
// include 16 bytes of BID prefix for identification
bcopy(&mdp->out.payload[0],&reply.out.payload[1],16);
// and version of manifest
bcopy(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES],
&reply.out.payload[1+16],8);
int i;
for(i=0;i<32;i++)
if (!(bitmap&(1<<(31-i))))
{
// calculate and set offset of block
uint64_t blockOffset=fileOffset+i*blockLength;
write_uint64(&reply.out.payload[1+16+8],blockOffset);
// work out how many bytes to read
int blockBytes=blob_bytes-blockOffset;
if (blockBytes>blockLength) blockBytes=blockLength;
// read data for block
if (blob_bytes>=blockOffset) {
sqlite3_blob_read(blob,&reply.out.payload[1+16+8+8],
blockBytes,blockOffset);
reply.out.payload_length=1+16+8+8+blockBytes;
// Mark terminal block if required
if (blockOffset+blockBytes==blob_bytes) reply.out.payload[0]='T';
// send packet
overlay_mdp_dispatch(&reply,0 /* system generated */, NULL,0);
} else break;
}
sqlite3_blob_close(blob); blob=NULL;
RETURN(-1);
}
int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp)
{
IN();
if (!mdp->out.payload_length) RETURN(-1);
int type=mdp->out.payload[0];
switch (type) {
case 'B': /* data block */
case 'T': /* terminal data block */
{
if (mdp->out.payload_length<(1+16+8+8+1)) RETURN(-1);
unsigned char *bidprefix=&mdp->out.payload[1];
uint64_t version=read_uint64(&mdp->out.payload[1+16]);
uint64_t offset=read_uint64(&mdp->out.payload[1+16+8]);
int count=mdp->out.payload_length-(1+16+8+8);
unsigned char *bytes=&mdp->out.payload[1+16+8+8];
if (0)
DEBUGF("Received %d bytes @ 0x%llx for %s* version 0x%llx",
count,offset,alloca_tohex(bidprefix,16),version);
/* Now see if there is a slot that matches. If so, then
see if the bytes are in the window, and write them.
If there is not matching slot, then consider setting
a slot to capture this files as it is being requested
by someone else.
*/
rhizome_received_content(bidprefix,version,offset,count,bytes,type);
RETURN(-1);
}
break;
}
RETURN(-1);
}
int overlay_mdp_service_dnalookup(overlay_mdp_frame *mdp)
{
IN();
int cn=0,in=0,kp=0;
char did[64+1];
int pll=mdp->out.payload_length;
if (pll>64) pll=64;
/* get did from the packet */
if (mdp->out.payload_length<1) {
RETURN(WHY("Empty DID in DNA resolution request")); }
bcopy(&mdp->out.payload[0],&did[0],pll);
did[pll]=0;
if (debug & DEBUG_MDPREQUESTS)
DEBUG("MDP_PORT_DNALOOKUP");
int results=0;
while(keyring_find_did(keyring,&cn,&in,&kp,did))
{
/* package DID and Name into reply (we include the DID because
it could be a wild-card DID search, but the SID is implied
in the source address of our reply). */
if (keyring->contexts[cn]->identities[in]->keypairs[kp]->private_key_len > DID_MAXSIZE)
/* skip excessively long DID records */
continue;
const unsigned char *packedSid = keyring->contexts[cn]->identities[in]->keypairs[0]->public_key;
const char *unpackedDid = (const char *) keyring->contexts[cn]->identities[in]->keypairs[kp]->private_key;
const char *name = (const char *)keyring->contexts[cn]->identities[in]->keypairs[kp]->public_key;
// URI is sid://SIDHEX/DID
strbuf b = strbuf_alloca(SID_STRLEN + DID_MAXSIZE + 10);
strbuf_puts(b, "sid://");
strbuf_tohex(b, packedSid, SID_SIZE);
strbuf_puts(b, "/local/");
strbuf_puts(b, unpackedDid);
overlay_mdp_dnalookup_reply(&mdp->out.src, packedSid, strbuf_str(b), unpackedDid, name);
kp++;
results++;
}
if (!results) {
/* No local results, so see if servald has been configured to use
a DNA-helper that can provide additional mappings. This provides
a generalised interface for resolving telephone numbers into URIs.
The first use will be for resolving DIDs to SIP addresses for
OpenBTS boxes run by the OTI/Commotion project.
The helper is run asynchronously, and the replies will be delivered
when results become available, so this function will return
immediately, so as not to cause blockages and delays in servald.
*/
dna_helper_enqueue(mdp, did, mdp->out.src.sid);
monitor_tell_formatted(MONITOR_DNAHELPER, "LOOKUP:%s:%d:%s\n",
alloca_tohex_sid(mdp->out.src.sid), mdp->out.src.port,
did);
}
RETURN(0);
}
int overlay_mdp_service_echo(overlay_mdp_frame *mdp)
{
/* Echo is easy: we swap the sender and receiver addresses (and thus port
numbers) and send the frame back. */
IN();
/* Swap addresses */
overlay_mdp_swap_src_dst(mdp);
mdp->out.ttl=0;
/* Prevent echo:echo connections and the resulting denial of service from triggering endless pongs. */
if (mdp->out.dst.port==MDP_PORT_ECHO) {
RETURN(WHY("echo loop averted"));
}
/* If the packet was sent to broadcast, then replace broadcast address
with our local address. For now just responds with first local address */
if (is_sid_broadcast(mdp->out.src.sid))
{
if (my_subscriber)
bcopy(my_subscriber->sid,
mdp->out.src.sid,SID_SIZE);
else
/* No local addresses, so put all zeroes */
bzero(mdp->out.src.sid,SID_SIZE);
}
/* Always send PONGs auth-crypted so that the receipient knows
that they are genuine, and so that we avoid the extra cost
of signing (which is slower than auth-crypting) */
int preserved=mdp->packetTypeAndFlags;
mdp->packetTypeAndFlags&=~(MDP_NOCRYPT|MDP_NOSIGN);
/* queue frame for delivery */
overlay_mdp_dispatch(mdp,0 /* system generated */,
NULL,0);
mdp->packetTypeAndFlags=preserved;
/* and switch addresses back around in case the caller was planning on
using MDP structure again (this happens if there is a loop-back reply
and the frame needs sending on, as happens with broadcasts. MDP ping
is a simple application where this occurs). */
overlay_mdp_swap_src_dst(mdp);
RETURN(0);
}
int overlay_mdp_try_interal_services(overlay_mdp_frame *mdp)
{
IN();
switch(mdp->out.dst.port) {
case MDP_PORT_VOMP: RETURN(vomp_mdp_received(mdp));
case MDP_PORT_KEYMAPREQUEST: RETURN(keyring_mapping_request(keyring,mdp));
case MDP_PORT_DNALOOKUP: RETURN(overlay_mdp_service_dnalookup(mdp));
case MDP_PORT_ECHO: RETURN(overlay_mdp_service_echo(mdp));
case MDP_PORT_RHIZOME_REQUEST:
if (is_rhizome_mdp_server_running()) {
RETURN(overlay_mdp_service_rhizomerequest(mdp));
} else break;
case MDP_PORT_RHIZOME_RESPONSE:
if (is_rhizome_mdp_server_running()) {
RETURN(overlay_mdp_service_rhizomeresponse(mdp));
} else break;
}
/* Unbound socket. We won't be sending ICMP style connection refused
messages, partly because they are a waste of bandwidth. */
RETURN(WHYF("Received packet for which no listening process exists (MDP ports: src=%d, dst=%d",
mdp->out.src.port,mdp->out.dst.port));
}

86
packetformats.c Normal file
View File

@ -0,0 +1,86 @@
/*
Serval Distributed Numbering Architecture (DNA)
Copyright (C) 2010 Paul Gardner-Stephen
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "serval.h"
int packetOk(struct overlay_interface *interface, unsigned char *packet, size_t len,
unsigned char *transaction_id,int ttl,
struct sockaddr *recvaddr, size_t recvaddrlen,int parseP)
{
if (len<HEADERFIELDS_LEN) return WHY("Packet is too short");
if (packet[0]==0x4F&&packet[1]==0x10)
{
if (interface!=NULL)
{
return packetOkOverlay(interface,packet,len,transaction_id,ttl,
recvaddr,recvaddrlen,parseP);
}
else
/* We ignore overlay mesh packets in simple server mode, which is indicated by interface==-1 */
return WHY("Ignoring overlay mesh packet");
}
return WHY("Packet type not recognised.");
}
void write_uint64(unsigned char *o,uint64_t v)
{
int i;
for(i=0;i<8;i++)
{ *(o++)=v&0xff; v=v>>8; }
}
void write_uint32(unsigned char *o,uint32_t v)
{
int i;
for(i=0;i<4;i++)
{ *(o++)=v&0xff; v=v>>8; }
}
void write_uint16(unsigned char *o,uint16_t v)
{
int i;
for(i=0;i<2;i++)
{ *(o++)=v&0xff; v=v>>8; }
}
uint64_t read_uint64(unsigned char *o)
{
int i;
uint64_t v=0;
for(i=0;i<8;i++) v=(v<<8)|o[8-1-i];
return v;
}
uint32_t read_uint32(unsigned char *o)
{
int i;
uint32_t v=0;
for(i=0;i<4;i++) v=(v<<8)|o[4-1-i];
return v;
}
uint16_t read_uint16(unsigned char *o)
{
int i;
uint16_t v=0;
for(i=0;i<2;i++) v=(v<<8)|o[2-1-i];
return v;
}

View File

@ -22,9 +22,37 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "rhizome.h"
#include <stdlib.h>
int rhizome_enabled()
int is_rhizome_enabled()
{
return confValueGetBoolean("rhizome.enable", 1);;
return confValueGetBoolean("rhizome.enable", 1);
}
int is_rhizome_http_enabled()
{
return confValueGetBoolean("rhizome.enable", 1)
&& confValueGetBoolean("rhizome.http.enable", 1)
&& rhizome_db;
}
int is_rhizome_mdp_enabled()
{
return confValueGetBoolean("rhizome.enable", 1)
&& confValueGetBoolean("rhizome.mdp.enable", 1)
&& rhizome_db;
}
int is_rhizome_mdp_server_running()
{
return is_rhizome_mdp_enabled();
}
int is_rhizome_advertise_enabled()
{
return confValueGetBoolean("rhizome.enable", 1)
&& confValueGetBoolean("rhizome.advertise.enable", 1)
&& rhizome_db
&& ( is_rhizome_http_server_running()
||is_rhizome_mdp_server_running());
}
int rhizome_fetch_delay_ms()

View File

@ -326,8 +326,8 @@ int rhizome_sign_hash_with_key(rhizome_manifest *m,const unsigned char *sk,
int rhizome_verify_bundle_privatekey(rhizome_manifest *m, const unsigned char *sk,
const unsigned char *pk);
int rhizome_find_bundle_author(rhizome_manifest *m);
int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, int timeout);
int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip);
int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, const unsigned char peersid[SID_SIZE], int timeout);
int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]);
/* one manifest is required per candidate, plus a few spare.
so MAX_RHIZOME_MANIFESTS must be > MAX_CANDIDATES.
@ -335,7 +335,7 @@ int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in
#define MAX_RHIZOME_MANIFESTS 24
#define MAX_CANDIDATES 16
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip);
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]);
typedef struct rhizome_http_request {
struct sched_ent alarm;
@ -428,6 +428,10 @@ struct http_response {
unsigned long long content_length;
const char * body;
};
int rhizome_received_content(unsigned char *bidprefix,uint64_t version,
uint64_t offset,int count,unsigned char *bytes,
int type);
int rhizome_server_set_response(rhizome_http_request *r, const struct http_response *h);
int rhizome_server_free_http_request(rhizome_http_request *r);
int rhizome_server_http_send_bytes(rhizome_http_request *r);
@ -439,6 +443,13 @@ int rhizome_http_server_start(int (*http_parse_func)(rhizome_http_request *),
const char *http_parse_func_description,
int port_low,int port_high);
int is_rhizome_enabled();
int is_rhizome_mdp_enabled();
int is_rhizome_http_enabled();
int is_rhizome_advertise_enabled();
int is_rhizome_mdp_server_running();
int is_rhizome_http_server_running();
typedef struct rhizome_direct_bundle_cursor {
/* Where the current fill started */
long long start_size_high;
@ -552,7 +563,7 @@ enum rhizome_start_fetch_result {
SLOTBUSY
};
enum rhizome_start_fetch_result rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length);
enum rhizome_start_fetch_result rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char sid[SID_SIZE],const unsigned char *prefix, size_t prefix_length);
int rhizome_any_fetch_active();
int rhizome_any_fetch_queued();

View File

@ -731,6 +731,8 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
DEBUGF("Dispatch size_high=%lld",r->cursor->size_high);
rhizome_direct_transport_state_http *state = r->transport_specific_state;
unsigned char zerosid[SID_SIZE]="\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0";
int sock=socket(AF_INET, SOCK_STREAM, 0);
if (sock==-1) {
WHY_perror("socket");
@ -876,7 +878,7 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
Then as noted above, we can use that to pull the file down using
existing routines.
*/
if (!rhizome_fetch_request_manifest_by_prefix(&addr,&actionlist[i+1],RHIZOME_BAR_PREFIX_BYTES))
if (!rhizome_fetch_request_manifest_by_prefix(&addr,zerosid,&actionlist[i+1],RHIZOME_BAR_PREFIX_BYTES))
{
/* Fetching the manifest, and then using it to see if we want to
fetch the file for import is all handled asynchronously, so just

View File

@ -24,12 +24,19 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "rhizome.h"
#include "str.h"
#include "strbuf_helpers.h"
#include "overlay_address.h"
/* Represents a queued fetch of a bundle payload, for which the manifest is already known.
*/
struct rhizome_fetch_candidate {
rhizome_manifest *manifest;
struct sockaddr_in peer;
/* Address of node offering manifest.
Can be either IP+port for HTTP or it can be a SID
for MDP. */
struct sockaddr_in peer_ipandport;
unsigned char peer_sid[SID_SIZE];
int priority;
};
@ -39,22 +46,45 @@ struct rhizome_fetch_candidate {
struct rhizome_fetch_slot {
struct sched_ent alarm; // must be first element in struct
rhizome_manifest *manifest;
struct sockaddr_in peer;
struct sockaddr_in peer_ipandport;
unsigned char peer_sid[SID_SIZE];
int state;
#define RHIZOME_FETCH_FREE 0
#define RHIZOME_FETCH_CONNECTING 1
#define RHIZOME_FETCH_SENDINGHTTPREQUEST 2
#define RHIZOME_FETCH_RXHTTPHEADERS 3
#define RHIZOME_FETCH_RXFILE 4
#define RHIZOME_FETCH_RXFILEMDP 5
FILE *file;
char filename[1024];
int64_t file_len;
int64_t file_ofs;
int64_t last_write_time;
/* HTTP transport specific elements */
char request[1024];
int request_len;
int request_ofs;
int64_t file_len;
int64_t file_ofs;
/* MDP transport specific elements */
unsigned char bid[RHIZOME_MANIFEST_ID_BYTES];
int64_t bidVersion;
int bidP;
unsigned char prefix[RHIZOME_MANIFEST_ID_BYTES];
int prefix_length;
int mdpIdleTimeout;
int mdpResponsesOutstanding;
int mdpRXBlockLength;
uint32_t mdpRXBitmap;
unsigned char mdpRXWindow[32*200];
};
static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot);
static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot);
static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot);
/* Represents a queue of fetch candidates and a single active fetch for bundle payloads whose size
* is less than a given threshold.
*
@ -380,7 +410,8 @@ int rhizome_manifest_version_cache_lookup(rhizome_manifest *m)
typedef struct ignored_manifest {
unsigned char bid[crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES];
struct sockaddr_in peer;
struct sockaddr_in peer_ipandport;
unsigned char peer_sid[SID_SIZE];
time_ms_t timeout;
} ignored_manifest;
@ -400,7 +431,7 @@ typedef struct ignored_manifest_cache {
a collision is exceedingly remote */
ignored_manifest_cache ignored;
int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip)
int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char *peersid)
{
int bin = m->cryptoSignPublic[0]>>(8-IGNORED_BIN_BITS);
int slot;
@ -419,7 +450,7 @@ int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in
return 0;
}
int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, int timeout)
int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, const unsigned char peersid[SID_SIZE], int timeout)
{
/* The supplied manifest from a given IP has errors, so remember
that it isn't worth considering */
@ -439,8 +470,11 @@ int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in
/* ignore for a while */
ignored.bins[bin].m[slot].timeout=gettime_ms()+timeout;
bcopy(peerip,
&ignored.bins[bin].m[slot].peer,
&ignored.bins[bin].m[slot].peer_ipandport,
sizeof(struct sockaddr_in));
bcopy(peersid,
ignored.bins[bin].m[slot].peer_sid,
SID_SIZE);
return 0;
}
@ -468,31 +502,35 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
WHYF_perror("fopen(`%s`, \"w\")", slot->filename);
goto bail;
}
if (slot->peer.sin_family == AF_INET) {
if (slot->peer_ipandport.sin_family == AF_INET) {
/* Transfer via HTTP over IPv4 */
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
WHY_perror("socket");
goto bail;
goto bail_http;
}
if (set_nonblock(sock) == -1)
goto bail;
goto bail_http;
char buf[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &slot->peer.sin_addr, buf, sizeof buf) == NULL) {
if (inet_ntop(AF_INET, &slot->peer_ipandport.sin_addr, buf, sizeof buf) == NULL) {
buf[0] = '*';
buf[1] = '\0';
}
if (connect(sock, (struct sockaddr*)&slot->peer, sizeof slot->peer) == -1) {
if (connect(sock, (struct sockaddr*)&slot->peer_ipandport,
sizeof slot->peer_ipandport) == -1) {
if (errno == EINPROGRESS) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("connect() returned EINPROGRESS");
} else {
WHYF_perror("connect(%d, %s:%u)", sock, buf, ntohs(slot->peer.sin_port));
goto bail;
WHYF_perror("connect(%d, %s:%u)", sock, buf,
ntohs(slot->peer_ipandport.sin_port));
goto bail_http;
}
}
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("RHIZOME HTTP REQUEST family=%u addr=%s port=%u %s",
slot->peer.sin_family, buf, ntohs(slot->peer.sin_port), alloca_str_toprint(slot->request)
DEBUGF("RHIZOME HTTP REQUEST family=%u addr=%s sid=%s port=%u %s",
slot->peer_ipandport.sin_family,
alloca_tohex_sid(slot->peer_sid),
buf, ntohs(slot->peer_ipandport.sin_port), alloca_str_toprint(slot->request)
);
slot->alarm.poll.fd = sock;
slot->request_ofs = 0;
@ -511,10 +549,16 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&slot->alarm);
return 0;
} else {
/* TODO: Fetch via overlay */
WHY("Rhizome fetching via overlay not implemented");
}
bail_http:
/* Fetch via overlay, either because no IP address was provided, or because
the connection/attempt to fetch via HTTP failed. */
WHY("Rhizome fetching via overlay not implemented");
slot->state=RHIZOME_FETCH_RXFILEMDP;
rhizome_fetch_switch_to_mdp(slot);
return 0;
bail:
if (sock != -1)
close(sock);
@ -565,7 +609,7 @@ bail:
* @author Andrew Bettison <andrew@servalproject.com>
*/
static enum rhizome_start_fetch_result
rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct sockaddr_in *peerip)
rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct sockaddr_in *peerip,unsigned const char *peersid)
{
if (slot->state != RHIZOME_FETCH_FREE)
return SLOTBUSY;
@ -667,12 +711,23 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct
// Start the fetch.
//dump("peerip", peerip, sizeof *peerip);
slot->peer = *peerip;
/* Prepare for fetching via HTTP */
slot->peer_ipandport = *peerip;
strbuf r = strbuf_local(slot->request, sizeof slot->request);
strbuf_sprintf(r, "GET /rhizome/file/%s HTTP/1.0\r\n\r\n", m->fileHexHash);
if (strbuf_overrun(r))
return WHY("request overrun");
slot->request_len = strbuf_len(r);
/* Prepare for fetching via MDP */
bcopy(peersid,slot->peer_sid,SID_SIZE);
bcopy(m->cryptoSignPublic,slot->bid,RHIZOME_MANIFEST_ID_BYTES);
slot->bidVersion=m->version;
DEBUGF("request bid=%s, version=0x%llx",
alloca_tohex_bid(slot->bid),slot->bidVersion);
slot->bidP=1;
if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "payload.%s", bid))
return -1;
m->dataFileName = strdup(slot->filename);
@ -692,19 +747,30 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct
* Returns -1 on error.
*/
enum rhizome_start_fetch_result
rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length)
rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip,
const unsigned char peersid[SID_SIZE],
const unsigned char *prefix, size_t prefix_length)
{
assert(peerip);
struct rhizome_fetch_slot *slot = rhizome_find_fetch_slot(MAX_MANIFEST_BYTES);
if (slot == NULL)
return SLOTBUSY;
slot->peer = *peerip;
/* Prepare for fetching via HTTP */
slot->peer_ipandport = *peerip;
slot->manifest = NULL;
strbuf r = strbuf_local(slot->request, sizeof slot->request);
strbuf_sprintf(r, "GET /rhizome/manifestbyprefix/%s HTTP/1.0\r\n\r\n", alloca_tohex(prefix, prefix_length));
if (strbuf_overrun(r))
return WHY("request overrun");
slot->request_len = strbuf_len(r);
/* Prepare for fetching via MDP */
bcopy(peersid,slot->peer_sid,SID_SIZE);
bcopy(prefix,slot->prefix,prefix_length);
slot->prefix_length=prefix_length;
slot->bidP=0;
if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "manifest.%s", alloca_tohex(prefix, prefix_length)))
return -1;
if (schedule_fetch(slot) == -1) {
@ -726,7 +792,7 @@ static void rhizome_start_next_queued_fetch(struct rhizome_fetch_slot *slot)
int i = 0;
struct rhizome_fetch_candidate *c;
while (i < q->candidate_queue_size && (c = &q->candidate_queue[i])->manifest) {
int result = rhizome_fetch(slot, c->manifest, &c->peer);
int result = rhizome_fetch(slot, c->manifest, &c->peer_ipandport,c->peer_sid);
switch (result) {
case SLOTBUSY:
return;
@ -781,7 +847,7 @@ static void rhizome_start_next_queued_fetches(struct sched_ent *alarm)
*
* @author Andrew Bettison <andrew@servalproject.com>
*/
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip)
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE])
{
IN();
const char *bid = alloca_tohex_bid(m->cryptoSignPublic);
@ -807,7 +873,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
if (rhizome_manifest_verify(m) != 0) {
WHY("Error verifying manifest when considering for import");
/* Don't waste time looking at this manifest again for a while */
rhizome_queue_ignore_manifest(m, peerip, 60000);
rhizome_queue_ignore_manifest(m, peerip, peersid, 60000);
rhizome_manifest_free(m);
RETURN(-1);
}
@ -841,7 +907,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
if (!m->selfSigned && rhizome_manifest_verify(m)) {
WHY("Error verifying manifest when considering queuing for import");
/* Don't waste time looking at this manifest again for a while */
rhizome_queue_ignore_manifest(m, peerip, 60000);
rhizome_queue_ignore_manifest(m, peerip, peersid, 60000);
rhizome_manifest_free(m);
RETURN(-1);
}
@ -867,15 +933,16 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
if (!m->selfSigned && rhizome_manifest_verify(m)) {
WHY("Error verifying manifest when considering queuing for import");
/* Don't waste time looking at this manifest again for a while */
rhizome_queue_ignore_manifest(m, peerip, 60000);
rhizome_queue_ignore_manifest(m, peerip, peersid, 60000);
rhizome_manifest_free(m);
RETURN(-1);
}
struct rhizome_fetch_candidate *c = rhizome_fetch_insert(qi, ci);
c->manifest = m;
c->peer = *peerip;
c->priority = priority;
c->peer_ipandport = *peerip;
bcopy(peersid,c->peer_sid,SID_SIZE);
if (debug & DEBUG_RHIZOME_RX) {
DEBUG("Rhizome fetch queues:");
@ -909,15 +976,16 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
{
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("close Rhizome fetch slot=%d", slotno(slot));
// if (debug & DEBUG_RHIZOME_RX)
DEBUGF("close Rhizome fetch slot=%d", slotno(slot));
assert(slot->state != RHIZOME_FETCH_FREE);
/* close socket and stop watching it */
unwatch(&slot->alarm);
unschedule(&slot->alarm);
unwatch(&slot->alarm);
close(slot->alarm.poll.fd);
slot->alarm.poll.fd = -1;
slot->alarm.function=NULL;
/* Free ephemeral data */
if (slot->file)
@ -940,14 +1008,173 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
return 0;
}
static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm)
{
struct rhizome_fetch_slot *slot=(struct rhizome_fetch_slot*)alarm;
if (slot->state!=5) {
DEBUGF("Stale alarm triggered on idle/reclaimed slot. Ignoring");
unschedule(alarm);
return;
}
long long now=gettime_ms();
if (now-slot->last_write_time>slot->mdpIdleTimeout) {
DEBUGF("MDP connection timed out: last RX %lldms ago",
now-slot->last_write_time);
rhizome_fetch_close(slot);
return;
}
if (debug& DEBUG_RHIZOME_RX)
DEBUGF("Timeout waiting for blocks. Resending request for slot=0x%p",
slot);
if (slot->bidP)
rhizome_fetch_mdp_requestblocks(slot);
else
rhizome_fetch_mdp_requestmanifest(slot);
}
static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot)
{
// only issue new requests every 133ms.
// we automatically re-issue once we have received all packets in this
// request also, so if there is no packet loss, we can go substantially
// faster. Optimising behaviour when there is no packet loss is an
// outstanding task.
overlay_mdp_frame mdp;
bzero(&mdp,sizeof(mdp));
bcopy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE);
mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE;
bcopy(slot->peer_sid,mdp.out.dst.sid,SID_SIZE);
mdp.out.dst.port=MDP_PORT_RHIZOME_REQUEST;
mdp.out.ttl=1;
mdp.packetTypeAndFlags=MDP_TX;
mdp.out.queue=OQ_ORDINARY;
mdp.out.payload_length=RHIZOME_BAR_BYTES+8+8+4+2;
bcopy(slot->bid,&mdp.out.payload[0],RHIZOME_MANIFEST_ID_BYTES);
write_uint64(&mdp.out.payload[RHIZOME_BAR_BYTES],slot->bidVersion);
write_uint64(&mdp.out.payload[RHIZOME_BAR_BYTES+8],slot->file_ofs);
write_uint32(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8],slot->mdpRXBitmap);
write_uint16(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8+4],slot->mdpRXBlockLength);
if (0)
DEBUGF("src sid=%s, dst sid=%s, mdpRXWindowStart=0x%x",
alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid),
slot->file_ofs);
overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0);
// remember when we sent the request so that we can adjust the inter-request
// interval based on how fast the packets arrive.
slot->mdpResponsesOutstanding=32; // TODO: set according to bitmap
unschedule(&slot->alarm);
slot->alarm.function = rhizome_fetch_mdp_slot_callback;
slot->alarm.alarm=gettime_ms()+133;
slot->alarm.deadline=slot->alarm.alarm+500;
schedule(&slot->alarm);
return 0;
}
static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot)
{
if (slot->prefix_length<1||slot->prefix_length>32) {
// invalid request
DEBUGF("invalid MDP Rhizome request");
return rhizome_fetch_close(slot);
}
if ((gettime_ms()-slot->last_write_time)>slot->mdpIdleTimeout) {
// connection timed out
DEBUGF("MDP connection timedout");
return rhizome_fetch_close(slot);
}
overlay_mdp_frame mdp;
bzero(&mdp,sizeof(mdp));
bcopy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE);
mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE;
bcopy(slot->peer_sid,mdp.out.dst.sid,SID_SIZE);
mdp.out.dst.port=MDP_PORT_RHIZOME_REQUEST;
mdp.out.ttl=1;
mdp.packetTypeAndFlags=MDP_TX;
mdp.out.queue=OQ_ORDINARY;
mdp.out.payload_length=slot->prefix_length;
bcopy(slot->prefix,&mdp.out.payload[0],slot->prefix_length);
overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0);
DEBUGF("Set callback function, and set alarm");
slot->alarm.function = rhizome_fetch_mdp_slot_callback;
slot->alarm.alarm=gettime_ms()+100;
slot->alarm.deadline=slot->alarm.alarm+500;
schedule(&slot->alarm);
return 0;
}
static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
{
DEBUGF("Trying to switch to MDP for Rhizome fetch: slot=0x%p",slot);
/* close socket and stop watching it */
unwatch(&slot->alarm);
unschedule(&slot->alarm);
if (slot->alarm.poll.fd!=-1) {
close(slot->alarm.poll.fd);
slot->alarm.poll.fd = -1;
}
/* Begin MDP fetch process.
1. Send initial request.
2. Set timeout for next request (if fetching a file).
3. Set timeout for no traffic received.
*/
slot->state=RHIZOME_FETCH_RXFILEMDP;
slot->last_write_time=gettime_ms();
if (slot->bidP) {
/* We are requesting a file. The http request may have already received
some of the file, so take that into account when setting up ring buffer.
Then send the request for the next block of data, and set our alarm to
re-ask in a little while. "In a little while" is 133ms, which is roughly
the time it takes to send 16KB via WiFi broadcast at the 1Mbit base rate
(this will need tuning for non-WiFi interfaces). 16KB ~= 32 x 200 bytes
which is the block size we will use. 200bytes allows for several blocks
to fit into a packet, and probably fit at least one any any outgoing packet
that is not otherwise full. */
slot->mdpIdleTimeout=5000; // give up if nothing received for 5 seconds
slot->mdpRXBitmap=0x00000000; // no blocks received yet
slot->mdpRXBlockLength=200; // 200;
rhizome_fetch_mdp_requestblocks(slot);
} else {
/* We are requesting a manifest, which is stateless, except that we eventually
give up. All we need to do now is send the request, and set our alarm to
try again in case we haven't heard anything back. */
slot->mdpIdleTimeout=2000; // only try for two seconds
rhizome_fetch_mdp_requestmanifest(slot);
}
return 0;
}
void rhizome_fetch_write(struct rhizome_fetch_slot *slot)
{
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("write_nonblock(%d, %s)", slot->alarm.poll.fd, alloca_toprint(-1, &slot->request[slot->request_ofs], slot->request_len-slot->request_ofs));
int bytes = write_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_ofs], slot->request_len-slot->request_ofs);
if (bytes == -1) {
WHY("Got error while sending HTTP request. Closing.");
rhizome_fetch_close(slot);
WHY("Got error while sending HTTP request.");
rhizome_fetch_switch_to_mdp(slot);
return;
} else {
// reset timeout
unschedule(&slot->alarm);
@ -967,7 +1194,7 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot)
}
}
void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes)
int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes)
{
if (bytes>(slot->file_len-slot->file_ofs))
bytes=slot->file_len-slot->file_ofs;
@ -975,9 +1202,10 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed to write %d bytes to file @ offset %d", bytes, slot->file_ofs);
rhizome_fetch_close(slot);
return;
return -1;
}
slot->file_ofs+=bytes;
slot->last_write_time=gettime_ms();
if (slot->file_ofs>=slot->file_len) {
/* got all of file */
if (debug & DEBUG_RHIZOME_RX)
@ -987,13 +1215,19 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by
if (slot->manifest) {
// Were fetching payload, now we have it.
if (!rhizome_import_received_bundle(slot->manifest)){
char buf[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &slot->peer.sin_addr, buf, sizeof buf) == NULL) {
buf[0] = '*';
buf[1] = '\0';
if (slot->state==RHIZOME_FETCH_RXFILE) {
char buf[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &slot->peer_ipandport.sin_addr, buf, sizeof buf) == NULL) {
buf[0] = '*';
buf[1] = '\0';
}
INFOF("Completed http request from %s:%u for file %s",
buf, ntohs(slot->peer_ipandport.sin_port),
slot->manifest->fileHexHash);
} else {
INFOF("Completed MDP request from %s for file %s",
alloca_tohex_sid(slot->peer_sid), slot->manifest->fileHexHash);
}
INFOF("Completed http request from %s:%u for file %s",
buf, ntohs(slot->peer.sin_port), slot->manifest->fileHexHash);
}
} else {
/* This was to fetch the manifest, so now fetch the file if needed */
@ -1007,19 +1241,76 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by
rhizome_manifest_free(m);
} else {
DEBUGF("All looks good for importing manifest id=%s", alloca_tohex_bid(m->cryptoSignPublic));
dump("slot->peer",&slot->peer,sizeof(slot->peer));
rhizome_suggest_queue_manifest_import(m, &slot->peer);
dump("slot->peerip",&slot->peer_ipandport,sizeof(slot->peer_ipandport));
dump("slot->peersid",&slot->peer_sid,sizeof(slot->peer_sid));
rhizome_suggest_queue_manifest_import(m, &slot->peer_ipandport,
slot->peer_sid);
}
}
}
DEBUGF("Closing rhizome fetch slot = 0x%p",slot);
rhizome_fetch_close(slot);
return;
return -1;
}
// reset inactivity timeout
unschedule(&slot->alarm);
slot->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT;
slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&slot->alarm);
// slot is still open
return 0;
}
int rhizome_received_content(unsigned char *bidprefix,
uint64_t version, uint64_t offset,
int count,unsigned char *bytes,int type)
{
IN();
int i;
for(i=0;i<NQUEUES;i++) {
struct rhizome_fetch_slot *slot=&rhizome_fetch_queues[i].active;
if (slot->state==RHIZOME_FETCH_RXFILEMDP&&slot->bidP) {
if (!bcmp(slot->bid,bidprefix,16))
{
if (slot->file_ofs==offset) {
/* We don't know the file length until we receive the last
block. If it isn't the last block, lie, and claim the end of
file is yet to come. */
if (type=='T') slot->file_len=offset+count;
else {
slot->file_len=offset+count+1;
}
if (!rhizome_write_content(slot,(char *)bytes,count))
{
slot->mdpResponsesOutstanding--;
if (slot->mdpResponsesOutstanding==0) {
// We have received all responses, so immediately ask for more
rhizome_fetch_mdp_requestblocks(slot);
}
// TODO: Try flushing out stuck packets that we have kept due to
// packet loss / out-of-order delivery.
}
RETURN(0);
} else {
// TODO: Implement out-of-order buffering so that lost packets
// don't cause wastage
}
RETURN(0);
}
else
if (1)
DEBUGF("Doesn't match this slot = 0x%p, because BIDs don't match: %s* vs %s",
alloca_tohex(bidprefix,16),
alloca_tohex_bid(rhizome_fetch_queues[i].active.bid));
}
}
RETURN(-1);
}
void rhizome_fetch_poll(struct sched_ent *alarm)
@ -1028,88 +1319,87 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
if (alarm->poll.revents & (POLLIN | POLLOUT)) {
switch (slot->state) {
case RHIZOME_FETCH_CONNECTING:
case RHIZOME_FETCH_SENDINGHTTPREQUEST:
rhizome_fetch_write(slot);
case RHIZOME_FETCH_CONNECTING:
case RHIZOME_FETCH_SENDINGHTTPREQUEST:
rhizome_fetch_write(slot);
return;
case RHIZOME_FETCH_RXFILE: {
/* Keep reading until we have the promised amount of data */
char buffer[8192];
sigPipeFlag = 0;
int bytes = read_nonblock(slot->alarm.poll.fd, buffer, sizeof buffer);
/* If we got some data, see if we have found the end of the HTTP request */
if (bytes > 0) {
rhizome_write_content(slot, buffer, bytes);
return;
case RHIZOME_FETCH_RXFILE: {
/* Keep reading until we have the promised amount of data */
char buffer[8192];
sigPipeFlag = 0;
int bytes = read_nonblock(slot->alarm.poll.fd, buffer, sizeof buffer);
/* If we got some data, see if we have found the end of the HTTP request */
if (bytes > 0) {
rhizome_write_content(slot, buffer, bytes);
return;
} else {
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Empty read, closing connection");
rhizome_fetch_close(slot);
return;
}
if (sigPipeFlag) {
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Received SIGPIPE, closing connection");
rhizome_fetch_close(slot);
return;
}
}
break;
case RHIZOME_FETCH_RXHTTPHEADERS: {
/* Keep reading until we have two CR/LFs in a row */
sigPipeFlag = 0;
int bytes = read_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_len], 1024 - slot->request_len - 1);
/* If we got some data, see if we have found the end of the HTTP reply */
if (bytes > 0) {
// reset timeout
unschedule(&slot->alarm);
slot->alarm.alarm = gettime_ms() + RHIZOME_IDLE_TIMEOUT;
slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&slot->alarm);
slot->request_len += bytes;
if (http_header_complete(slot->request, slot->request_len, bytes)) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Got HTTP reply: %s", alloca_toprint(160, slot->request, slot->request_len));
/* We have all the reply headers, so parse them, taking care of any following bytes of
content. */
struct http_response_parts parts;
if (unpack_http_response(slot->request, &parts) == -1) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed HTTP request: failed to unpack http response");
rhizome_fetch_close(slot);
return;
}
if (parts.code != 200) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed HTTP request: rhizome server returned %d != 200 OK", parts.code);
rhizome_fetch_close(slot);
return;
}
if (parts.content_length == -1) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Invalid HTTP reply: missing Content-Length header");
rhizome_fetch_close(slot);
return;
}
slot->file_len = parts.content_length;
/* We have all we need. The file is already open, so just write out any initial bytes of
the body we read.
*/
slot->state = RHIZOME_FETCH_RXFILE;
int content_bytes = slot->request + slot->request_len - parts.content_start;
if (content_bytes > 0){
rhizome_write_content(slot, parts.content_start, content_bytes);
return;
}
}
}
break;
default:
} else {
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Empty read, closing connection");
rhizome_fetch_switch_to_mdp(slot);
return;
}
if (sigPipeFlag) {
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Received SIGPIPE, closing connection");
rhizome_fetch_switch_to_mdp(slot);
return;
}
}
break;
case RHIZOME_FETCH_RXHTTPHEADERS: {
/* Keep reading until we have two CR/LFs in a row */
sigPipeFlag = 0;
int bytes = read_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_len], 1024 - slot->request_len - 1);
/* If we got some data, see if we have found the end of the HTTP reply */
if (bytes > 0) {
// reset timeout
unschedule(&slot->alarm);
slot->alarm.alarm = gettime_ms() + RHIZOME_IDLE_TIMEOUT;
slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&slot->alarm);
slot->request_len += bytes;
if (http_header_complete(slot->request, slot->request_len, bytes)) {
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Closing rhizome fetch connection due to illegal/unimplemented state.");
rhizome_fetch_close(slot);
return;
DEBUGF("Got HTTP reply: %s", alloca_toprint(160, slot->request, slot->request_len));
/* We have all the reply headers, so parse them, taking care of any following bytes of
content. */
struct http_response_parts parts;
if (unpack_http_response(slot->request, &parts) == -1) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed HTTP request: failed to unpack http response");
rhizome_fetch_switch_to_mdp(slot);
return;
}
if (parts.code != 200) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed HTTP request: rhizome server returned %d != 200 OK", parts.code);
rhizome_fetch_switch_to_mdp(slot);
return;
}
if (parts.content_length == -1) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Invalid HTTP reply: missing Content-Length header");
rhizome_fetch_switch_to_mdp(slot);
return;
}
slot->file_len = parts.content_length;
/* We have all we need. The file is already open, so just write out any initial bytes of
the body we read.
*/
slot->state = RHIZOME_FETCH_RXFILE;
int content_bytes = slot->request + slot->request_len - parts.content_start;
if (content_bytes > 0){
rhizome_write_content(slot, parts.content_start, content_bytes);
return;
}
}
}
break;
default:
WARNF("Closing rhizome fetch connection due to illegal/unimplemented state=%d.",slot->state);
rhizome_fetch_close(slot);
return;
}
}
}
if (alarm->poll.revents==0 || alarm->poll.revents & (POLLHUP | POLLERR)){

View File

@ -72,7 +72,7 @@ unsigned char favicon_bytes[]={
,0,0,0,0,0,0,0,0,0,0,0,0,0,0};
int favicon_len=318;
int rhizome_http_server_running()
int is_rhizome_http_server_running()
{
return rhizome_server_socket != -1;
}

View File

@ -145,7 +145,7 @@ int overlay_rhizome_add_advertisements(struct decode_context *context, int inter
XXX We will move all processing of Rhizome into a separate process
so that the CPU delays caused by Rhizome verifying signatures isn't a problem.
*/
if (!rhizome_http_server_running() || !rhizome_db)
if (!is_rhizome_advertise_enabled())
RETURN(0);
int pass;
@ -409,7 +409,7 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long
RETURN(0);
}
if (rhizome_ignore_manifest_check(m, &httpaddr))
if (rhizome_ignore_manifest_check(m, &httpaddr,f->source->sid))
{
/* Ignoring manifest that has caused us problems recently */
if (1) WARNF("Ignoring manifest with errors: %s*", manifest_id_prefix);
@ -424,7 +424,7 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long
} else {
if (debug & DEBUG_RHIZOME_ADS)
DEBUG("Not seen before.");
rhizome_suggest_queue_manifest_import(m, &httpaddr);
rhizome_suggest_queue_manifest_import(m, &httpaddr,f->source->sid);
// the above function will free the manifest structure, make sure we don't free it again
m=NULL;
}
@ -435,7 +435,7 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long
DEBUG("Unverified manifest has errors - so not processing any further.");
/* Don't waste any time on this manifest in future attempts for at least
a minute. */
rhizome_queue_ignore_manifest(m, &httpaddr, 60000);
rhizome_queue_ignore_manifest(m, &httpaddr,f->source->sid, 60000);
}
if (m) {
rhizome_manifest_free(m);

View File

@ -738,6 +738,7 @@ void overlay_dummy_poll(struct sched_ent *alarm);
void overlay_route_tick(struct sched_ent *alarm);
void server_shutdown_check(struct sched_ent *alarm);
void overlay_mdp_poll(struct sched_ent *alarm);
int overlay_mdp_try_interal_services(overlay_mdp_frame *mdp);
void fd_periodicstats(struct sched_ent *alarm);
void rhizome_check_connections(struct sched_ent *alarm);
@ -770,4 +771,11 @@ void dump_stack();
int olsr_init_socket(void);
int olsr_send(struct overlay_frame *frame);
void write_uint64(unsigned char *o,uint64_t v);
void write_uint16(unsigned char *o,uint16_t v);
void write_uint32(unsigned char *o,uint32_t v);
uint64_t read_uint64(unsigned char *o);
uint32_t read_uint32(unsigned char *o);
uint16_t read_uint16(unsigned char *o);
#endif // __SERVALD_SERVALD_H

View File

@ -28,6 +28,7 @@ SERVAL_SOURCES = $(SERVAL_BASE)audiodevices.c \
$(SERVAL_BASE)overlay_interface.c \
$(SERVAL_BASE)overlay_queue.c \
$(SERVAL_BASE)overlay_mdp.c \
$(SERVAL_BASE)overlay_mdp_services.c \
$(SERVAL_BASE)overlay_olsr.c \
$(SERVAL_BASE)overlay_packetformats.c \
$(SERVAL_BASE)overlay_payload.c \

View File

@ -89,10 +89,102 @@ test_FileTransfer() {
assert_rhizome_received file2
}
doc_FileTransferBig="Big new bundle transfers to one node"
setup_FileTransferBig() {
doc_DisablingHTTPServer="Disabling HTTP rhizome transports works"
setup_DisablingHTTPServer() {
setup_common
set_instance +A
rhizome_add_file file1
executeOk_servald config set rhizome.http.enable 0
start_servald_instances +A
}
test_DisablingHTTPServer() {
!rhizome_http_server_started
}
doc_HTTPTransport="Rhizome over HTTP transport"
setup_HTTPTransport() {
setup_common
set_instance +B
executeOk_servald config set rhizome.mdp.enable 0
set_instance +A
executeOk_servald config set rhizome.mdp.enable 0
rhizome_add_file file1
start_servald_instances +A +B
foreach_instance +A assert_peers_are_instances +B
foreach_instance +B assert_peers_are_instances +A
}
test_HTTPTransport() {
wait_until bundle_received_by $BID:$VERSION +B
set_instance +B
executeOk_servald rhizome list ''
assert_rhizome_list --fromhere=0 file1
assert_rhizome_received file1
set_instance +A
rhizome_update_file file1 file2
set_instance +B
wait_until bundle_received_by $BID:$VERSION +B
executeOk_servald rhizome list ''
assert_rhizome_list --fromhere=0 file2
assert_rhizome_received file2
}
doc_MDPTransport="Rhizome over MDP transport"
setup_MDPTransport() {
setup_common
set_instance +B
executeOk_servald config set rhizome.http.enable 0
set_instance +A
executeOk_servald config set rhizome.http.enable 0
rhizome_add_file file1
start_servald_instances +A +B
foreach_instance +A assert_peers_are_instances +B
foreach_instance +B assert_peers_are_instances +A
}
test_MDPTransport() {
wait_until bundle_received_by $BID:$VERSION +B
set_instance +B
executeOk_servald rhizome list ''
assert_rhizome_list --fromhere=0 file1
assert_rhizome_received file1
set_instance +A
rhizome_update_file file1 file2
set_instance +B
wait_until bundle_received_by $BID:$VERSION +B
executeOk_servald rhizome list ''
assert_rhizome_list --fromhere=0 file2
assert_rhizome_received file2
}
doc_FileTransferBigMDP="Big new bundle transfers to one node via MDP"
setup_FileTransferBigMDP() {
setup_common
set_instance +B
executeOk_servald config set rhizome.http.enable 0
set_instance +A
executeOk_servald config set rhizome.http.enable 0
dd if=/dev/urandom of=file1 bs=1k count=1k 2>&1
echo x >>file1
ls -l file1
rhizome_add_file file1
start_servald_instances +A +B
foreach_instance +A assert_peers_are_instances +B
foreach_instance +B assert_peers_are_instances +A
}
test_FileTransferBigMDP() {
wait_until bundle_received_by $BID:$VERSION +B
set_instance +B
executeOk_servald rhizome list ''
assert_rhizome_list --fromhere=0 file1
assert_rhizome_received file1
}
doc_FileTransferBig="Big new bundle transfers to one node via HTTP"
setup_FileTransferBig() {
setup_common
set_instance +B
executeOk_servald config set rhizome.mdp.enable 0
set_instance +A
executeOk_servald config set rhizome.mdp.enable 0
dd if=/dev/urandom of=file1 bs=1k count=1k 2>&1
echo x >>file1
ls -l file1
@ -109,10 +201,20 @@ test_FileTransferBig() {
assert_rhizome_received file1
}
doc_FileTransferMulti="New bundle transfers to four nodes"
doc_FileTransferMulti="New bundle transfers to four nodes via HTTP"
setup_FileTransferMulti() {
setup_common
set_instance +A
executeOk_servald config set rhizome.mdp.enable 0
set_instance +B
executeOk_servald config set rhizome.mdp.enable 0
set_instance +C
executeOk_servald config set rhizome.mdp.enable 0
set_instance +D
executeOk_servald config set rhizome.mdp.enable 0
set_instance +E
executeOk_servald config set rhizome.mdp.enable 0
set_instance +A
rhizome_add_file file1
start_servald_instances +A +B +C +D +E
foreach_instance +A assert_peers_are_instances +B +C +D +E
@ -130,6 +232,38 @@ test_FileTransferMulti() {
done
}
doc_FileTransferMultiMDP="New bundle transfers to four nodes via MDP"
setup_FileTransferMultiMDP() {
setup_common
set_instance +A
executeOk_servald config set rhizome.http.enable 0
set_instance +B
executeOk_servald config set rhizome.http.enable 0
set_instance +C
executeOk_servald config set rhizome.http.enable 0
set_instance +D
executeOk_servald config set rhizome.http.enable 0
set_instance +E
executeOk_servald config set rhizome.http.enable 0
set_instance +A
rhizome_add_file file1
start_servald_instances +A +B +C +D +E
foreach_instance +A assert_peers_are_instances +B +C +D +E
foreach_instance +B assert_peers_are_instances +A +C +D +E
foreach_instance +C assert_peers_are_instances +A +B +D +E
foreach_instance +D assert_peers_are_instances +A +B +C +E
}
test_FileTransferMultiMDP() {
wait_until bundle_received_by $BID:$VERSION +B +C +D +E
for i in B C D E; do
set_instance +$i
executeOk_servald rhizome list ''
assert_rhizome_list --fromhere=0 file1
assert_rhizome_received file1
done
}
doc_FileTransferDelete="Payload deletion transfers to one node"
setup_FileTransferDelete() {
setup_common