Pass SID as well as ip:port through rhizome fetch request logic.

This is in preparation for rhizome over MDP.
This commit is contained in:
gardners 2012-11-29 15:38:04 +10:30
parent c4b90a108b
commit faad1f26b1
4 changed files with 106 additions and 46 deletions

View File

@ -326,8 +326,8 @@ int rhizome_sign_hash_with_key(rhizome_manifest *m,const unsigned char *sk,
int rhizome_verify_bundle_privatekey(rhizome_manifest *m, const unsigned char *sk,
const unsigned char *pk);
int rhizome_find_bundle_author(rhizome_manifest *m);
int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, int timeout);
int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip);
int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, const unsigned char peersid[SID_SIZE], int timeout);
int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]);
/* one manifest is required per candidate, plus a few spare.
so MAX_RHIZOME_MANIFESTS must be > MAX_CANDIDATES.
@ -335,7 +335,7 @@ int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in
#define MAX_RHIZOME_MANIFESTS 24
#define MAX_CANDIDATES 16
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip);
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]);
typedef struct rhizome_http_request {
struct sched_ent alarm;
@ -552,7 +552,7 @@ enum rhizome_start_fetch_result {
SLOTBUSY
};
enum rhizome_start_fetch_result rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length);
enum rhizome_start_fetch_result rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char sid[SID_SIZE],const unsigned char *prefix, size_t prefix_length);
int rhizome_any_fetch_active();
int rhizome_any_fetch_queued();

View File

@ -731,6 +731,8 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
DEBUGF("Dispatch size_high=%lld",r->cursor->size_high);
rhizome_direct_transport_state_http *state = r->transport_specific_state;
unsigned char zerosid[SID_SIZE]="\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0";
int sock=socket(AF_INET, SOCK_STREAM, 0);
if (sock==-1) {
WHY_perror("socket");
@ -876,7 +878,7 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
Then as noted above, we can use that to pull the file down using
existing routines.
*/
if (!rhizome_fetch_request_manifest_by_prefix(&addr,&actionlist[i+1],RHIZOME_BAR_PREFIX_BYTES))
if (!rhizome_fetch_request_manifest_by_prefix(&addr,zerosid,&actionlist[i+1],RHIZOME_BAR_PREFIX_BYTES))
{
/* Fetching the manifest, and then using it to see if we want to
fetch the file for import is all handled asynchronously, so just

View File

@ -29,7 +29,13 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
struct rhizome_fetch_candidate {
rhizome_manifest *manifest;
struct sockaddr_in peer;
/* Address of node offering manifest.
Can be either IP+port for HTTP or it can be a SID
for MDP. */
struct sockaddr_in peer_ipandport;
unsigned char peer_sid[SID_SIZE];
int priority;
};
@ -39,20 +45,32 @@ struct rhizome_fetch_candidate {
struct rhizome_fetch_slot {
struct sched_ent alarm; // must be first element in struct
rhizome_manifest *manifest;
struct sockaddr_in peer;
struct sockaddr_in peer_ipandport;
unsigned char peer_sid[SID_SIZE];
int state;
#define RHIZOME_FETCH_FREE 0
#define RHIZOME_FETCH_CONNECTING 1
#define RHIZOME_FETCH_SENDINGHTTPREQUEST 2
#define RHIZOME_FETCH_RXHTTPHEADERS 3
#define RHIZOME_FETCH_RXFILE 4
#define RHIZOME_FETCH_RXFILEMDP 5
FILE *file;
char filename[1024];
int64_t file_len;
int64_t file_ofs;
/* HTTP transport specific elements */
char request[1024];
int request_len;
int request_ofs;
int64_t file_len;
int64_t file_ofs;
/* MDP transport specific elements */
unsigned char bar[RHIZOME_BAR_BYTES];
int barP;
unsigned char prefix[RHIZOME_MANIFEST_ID_BYTES];
int prefix_length;
};
/* Represents a queue of fetch candidates and a single active fetch for bundle payloads whose size
@ -380,7 +398,8 @@ int rhizome_manifest_version_cache_lookup(rhizome_manifest *m)
typedef struct ignored_manifest {
unsigned char bid[crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES];
struct sockaddr_in peer;
struct sockaddr_in peer_ipandport;
unsigned char peer_sid[SID_SIZE];
time_ms_t timeout;
} ignored_manifest;
@ -400,7 +419,7 @@ typedef struct ignored_manifest_cache {
a collision is exceedingly remote */
ignored_manifest_cache ignored;
int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip)
int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char *peersid)
{
int bin = m->cryptoSignPublic[0]>>(8-IGNORED_BIN_BITS);
int slot;
@ -419,7 +438,7 @@ int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in
return 0;
}
int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, int timeout)
int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, const unsigned char peersid[SID_SIZE], int timeout)
{
/* The supplied manifest from a given IP has errors, so remember
that it isn't worth considering */
@ -439,8 +458,11 @@ int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in
/* ignore for a while */
ignored.bins[bin].m[slot].timeout=gettime_ms()+timeout;
bcopy(peerip,
&ignored.bins[bin].m[slot].peer,
&ignored.bins[bin].m[slot].peer_ipandport,
sizeof(struct sockaddr_in));
bcopy(peersid,
ignored.bins[bin].m[slot].peer_sid,
SID_SIZE);
return 0;
}
@ -468,31 +490,35 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
WHYF_perror("fopen(`%s`, \"w\")", slot->filename);
goto bail;
}
if (slot->peer.sin_family == AF_INET) {
if (slot->peer_ipandport.sin_family == AF_INET) {
/* Transfer via HTTP over IPv4 */
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
WHY_perror("socket");
goto bail;
goto bail_http;
}
if (set_nonblock(sock) == -1)
goto bail;
goto bail_http;
char buf[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &slot->peer.sin_addr, buf, sizeof buf) == NULL) {
if (inet_ntop(AF_INET, &slot->peer_ipandport.sin_addr, buf, sizeof buf) == NULL) {
buf[0] = '*';
buf[1] = '\0';
}
if (connect(sock, (struct sockaddr*)&slot->peer, sizeof slot->peer) == -1) {
if (connect(sock, (struct sockaddr*)&slot->peer_ipandport,
sizeof slot->peer_ipandport) == -1) {
if (errno == EINPROGRESS) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("connect() returned EINPROGRESS");
} else {
WHYF_perror("connect(%d, %s:%u)", sock, buf, ntohs(slot->peer.sin_port));
goto bail;
WHYF_perror("connect(%d, %s:%u)", sock, buf,
ntohs(slot->peer_ipandport.sin_port));
goto bail_http;
}
}
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("RHIZOME HTTP REQUEST family=%u addr=%s port=%u %s",
slot->peer.sin_family, buf, ntohs(slot->peer.sin_port), alloca_str_toprint(slot->request)
DEBUGF("RHIZOME HTTP REQUEST family=%u addr=%s sid=%s port=%u %s",
slot->peer_ipandport.sin_family,
alloca_tohex_sid(slot->peer_sid),
buf, ntohs(slot->peer_ipandport.sin_port), alloca_str_toprint(slot->request)
);
slot->alarm.poll.fd = sock;
slot->request_ofs = 0;
@ -511,10 +537,14 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&slot->alarm);
return 0;
} else {
/* TODO: Fetch via overlay */
WHY("Rhizome fetching via overlay not implemented");
}
bail_http:
/* Fetch via overlay, either because no IP address was provided, or because
the connection/attempt to fetch via HTTP failed. */
WHY("Rhizome fetching via overlay not implemented");
slot->state=RHIZOME_FETCH_RXFILEMDP;
bail:
if (sock != -1)
close(sock);
@ -565,7 +595,7 @@ bail:
* @author Andrew Bettison <andrew@servalproject.com>
*/
static enum rhizome_start_fetch_result
rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct sockaddr_in *peerip)
rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct sockaddr_in *peerip,unsigned const char *peersid)
{
if (slot->state != RHIZOME_FETCH_FREE)
return SLOTBUSY;
@ -667,12 +697,20 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct
// Start the fetch.
//dump("peerip", peerip, sizeof *peerip);
slot->peer = *peerip;
/* Prepare for fetching via HTTP */
slot->peer_ipandport = *peerip;
strbuf r = strbuf_local(slot->request, sizeof slot->request);
strbuf_sprintf(r, "GET /rhizome/file/%s HTTP/1.0\r\n\r\n", m->fileHexHash);
if (strbuf_overrun(r))
return WHY("request overrun");
slot->request_len = strbuf_len(r);
/* Prepare for fetching via MDP */
bcopy(peersid,slot->peer_sid,SID_SIZE);
rhizome_manifest_to_bar(m,slot->bar);
slot->barP=1;
if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "payload.%s", bid))
return -1;
m->dataFileName = strdup(slot->filename);
@ -692,19 +730,30 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct
* Returns -1 on error.
*/
enum rhizome_start_fetch_result
rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length)
rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip,
const unsigned char peersid[SID_SIZE],
const unsigned char *prefix, size_t prefix_length)
{
assert(peerip);
struct rhizome_fetch_slot *slot = rhizome_find_fetch_slot(MAX_MANIFEST_BYTES);
if (slot == NULL)
return SLOTBUSY;
slot->peer = *peerip;
/* Prepare for fetching via HTTP */
slot->peer_ipandport = *peerip;
slot->manifest = NULL;
strbuf r = strbuf_local(slot->request, sizeof slot->request);
strbuf_sprintf(r, "GET /rhizome/manifestbyprefix/%s HTTP/1.0\r\n\r\n", alloca_tohex(prefix, prefix_length));
if (strbuf_overrun(r))
return WHY("request overrun");
slot->request_len = strbuf_len(r);
/* Prepare for fetching via MDP */
bcopy(peersid,slot->peer_sid,SID_SIZE);
bcopy(prefix,slot->prefix,prefix_length);
slot->prefix_length=prefix_length;
slot->barP=0;
if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "manifest.%s", alloca_tohex(prefix, prefix_length)))
return -1;
if (schedule_fetch(slot) == -1) {
@ -726,7 +775,7 @@ static void rhizome_start_next_queued_fetch(struct rhizome_fetch_slot *slot)
int i = 0;
struct rhizome_fetch_candidate *c;
while (i < q->candidate_queue_size && (c = &q->candidate_queue[i])->manifest) {
int result = rhizome_fetch(slot, c->manifest, &c->peer);
int result = rhizome_fetch(slot, c->manifest, &c->peer_ipandport,c->peer_sid);
switch (result) {
case SLOTBUSY:
return;
@ -781,7 +830,7 @@ static void rhizome_start_next_queued_fetches(struct sched_ent *alarm)
*
* @author Andrew Bettison <andrew@servalproject.com>
*/
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip)
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE])
{
IN();
const char *bid = alloca_tohex_bid(m->cryptoSignPublic);
@ -807,7 +856,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
if (rhizome_manifest_verify(m) != 0) {
WHY("Error verifying manifest when considering for import");
/* Don't waste time looking at this manifest again for a while */
rhizome_queue_ignore_manifest(m, peerip, 60000);
rhizome_queue_ignore_manifest(m, peerip, peersid, 60000);
rhizome_manifest_free(m);
RETURN(-1);
}
@ -841,7 +890,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
if (!m->selfSigned && rhizome_manifest_verify(m)) {
WHY("Error verifying manifest when considering queuing for import");
/* Don't waste time looking at this manifest again for a while */
rhizome_queue_ignore_manifest(m, peerip, 60000);
rhizome_queue_ignore_manifest(m, peerip, peersid, 60000);
rhizome_manifest_free(m);
RETURN(-1);
}
@ -867,15 +916,16 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
if (!m->selfSigned && rhizome_manifest_verify(m)) {
WHY("Error verifying manifest when considering queuing for import");
/* Don't waste time looking at this manifest again for a while */
rhizome_queue_ignore_manifest(m, peerip, 60000);
rhizome_queue_ignore_manifest(m, peerip, peersid, 60000);
rhizome_manifest_free(m);
RETURN(-1);
}
struct rhizome_fetch_candidate *c = rhizome_fetch_insert(qi, ci);
c->manifest = m;
c->peer = *peerip;
c->priority = priority;
c->peer_ipandport = *peerip;
bcopy(peersid,c->peer_sid,SID_SIZE);
if (debug & DEBUG_RHIZOME_RX) {
DEBUG("Rhizome fetch queues:");
@ -987,13 +1037,19 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by
if (slot->manifest) {
// Were fetching payload, now we have it.
if (!rhizome_import_received_bundle(slot->manifest)){
char buf[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &slot->peer.sin_addr, buf, sizeof buf) == NULL) {
buf[0] = '*';
buf[1] = '\0';
if (slot->state==RHIZOME_FETCH_RXFILE) {
char buf[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &slot->peer_ipandport.sin_addr, buf, sizeof buf) == NULL) {
buf[0] = '*';
buf[1] = '\0';
}
INFOF("Completed http request from %s:%u for file %s",
buf, ntohs(slot->peer_ipandport.sin_port),
slot->manifest->fileHexHash);
} else {
INFOF("Completed MDP request from %s for file %s",
alloca_tohex_sid(slot->peer_sid), slot->manifest->fileHexHash);
}
INFOF("Completed http request from %s:%u for file %s",
buf, ntohs(slot->peer.sin_port), slot->manifest->fileHexHash);
}
} else {
/* This was to fetch the manifest, so now fetch the file if needed */
@ -1007,8 +1063,10 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by
rhizome_manifest_free(m);
} else {
DEBUGF("All looks good for importing manifest id=%s", alloca_tohex_bid(m->cryptoSignPublic));
dump("slot->peer",&slot->peer,sizeof(slot->peer));
rhizome_suggest_queue_manifest_import(m, &slot->peer);
dump("slot->peerip",&slot->peer_ipandport,sizeof(slot->peer_ipandport));
dump("slot->peersid",&slot->peer_sid,sizeof(slot->peer_sid));
rhizome_suggest_queue_manifest_import(m, &slot->peer_ipandport,
slot->peer_sid);
}
}
}

View File

@ -416,7 +416,7 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long
RETURN(0);
}
if (rhizome_ignore_manifest_check(m, &httpaddr))
if (rhizome_ignore_manifest_check(m, &httpaddr,f->source->sid))
{
/* Ignoring manifest that has caused us problems recently */
if (1) WARNF("Ignoring manifest with errors: %s*", manifest_id_prefix);
@ -431,7 +431,7 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long
} else {
if (debug & DEBUG_RHIZOME_ADS)
DEBUG("Not seen before.");
rhizome_suggest_queue_manifest_import(m, &httpaddr);
rhizome_suggest_queue_manifest_import(m, &httpaddr,f->source->sid);
// the above function will free the manifest structure, make sure we don't free it again
m=NULL;
}
@ -442,7 +442,7 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long
DEBUG("Unverified manifest has errors - so not processing any further.");
/* Don't waste any time on this manifest in future attempts for at least
a minute. */
rhizome_queue_ignore_manifest(m, &httpaddr, 60000);
rhizome_queue_ignore_manifest(m, &httpaddr,f->source->sid, 60000);
}
if (m) {
rhizome_manifest_free(m);