Replace internal usage of overlay_mdp_dispatch

This commit is contained in:
Jeremy Lakeman 2014-02-05 15:26:56 +10:30
parent 7ed6be3690
commit b56f4c27d3
16 changed files with 268 additions and 280 deletions

View File

@ -769,17 +769,17 @@ int app_dna_lookup(const struct cli_parsed *parsed, struct cli_context *context)
WHYF(" Error message: %s", rx.error.message);
} else if ((rx.packetTypeAndFlags&MDP_TYPE_MASK)==MDP_TX) {
/* Extract DID, Name, URI from response. */
if (strlen((char *)rx.in.payload)<512) {
if (strlen((char *)rx.out.payload)<512) {
char sidhex[SID_STRLEN + 1];
char did[DID_MAXSIZE + 1];
char name[64];
char uri[512];
if ( !parseDnaReply((char *)rx.in.payload, rx.in.payload_length, sidhex, did, name, uri, NULL)
if ( !parseDnaReply((char *)rx.out.payload, rx.out.payload_length, sidhex, did, name, uri, NULL)
|| !str_is_subscriber_id(sidhex)
|| !str_is_did(did)
|| !str_is_uri(uri)
) {
WHYF("Received malformed DNA reply: %s", alloca_toprint(160, (const char *)rx.in.payload, rx.in.payload_length));
WHYF("Received malformed DNA reply: %s", alloca_toprint(160, (const char *)rx.out.payload, rx.out.payload_length));
} else {
/* Have we seen this response before? */
int i;
@ -2884,8 +2884,8 @@ int app_reverse_lookup(const struct cli_parsed *parsed, struct cli_context *cont
}
// we might receive a late response from an ealier request on the same socket, ignore it
if (cmp_sid_t(&mdp_reply.in.src.sid, &dstsid) != 0) {
WHYF("Unexpected result from SID %s", alloca_tohex_sid_t(mdp_reply.in.src.sid));
if (cmp_sid_t(&mdp_reply.out.src.sid, &dstsid) != 0) {
WHYF("Unexpected result from SID %s", alloca_tohex_sid_t(mdp_reply.out.src.sid));
continue;
}
@ -2894,13 +2894,13 @@ int app_reverse_lookup(const struct cli_parsed *parsed, struct cli_context *cont
char did[DID_MAXSIZE + 1];
char name[64];
char uri[512];
if ( !parseDnaReply((char *)mdp_reply.in.payload, mdp_reply.in.payload_length, sidhex, did, name, uri, NULL)
if ( !parseDnaReply((char *)mdp_reply.out.payload, mdp_reply.out.payload_length, sidhex, did, name, uri, NULL)
|| !str_is_subscriber_id(sidhex)
|| !str_is_did(did)
|| !str_is_uri(uri)
) {
WHYF("Received malformed DNA reply: %s",
alloca_toprint(160, (const char *)mdp_reply.in.payload, mdp_reply.in.payload_length));
alloca_toprint(160, (const char *)mdp_reply.out.payload, mdp_reply.out.payload_length));
continue;
}

View File

@ -33,6 +33,8 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "serval.h"
#include "str.h"
#include "overlay_address.h"
#include "overlay_packet.h"
#include "overlay_buffer.h"
#include "conf.h"
#include "keyring.h"
@ -51,26 +53,25 @@ struct sched_ent directory_alarm={
#define DIRECTORY_UPDATE_INTERVAL 120000
// send a registration packet
static void directory_send(struct subscriber *directory_service, const sid_t *sidp, const char *did, const char *name)
static void directory_send(struct subscriber *directory_service, struct subscriber *source, const char *did, const char *name)
{
overlay_mdp_frame request;
memset(&request, 0, sizeof(overlay_mdp_frame));
request.packetTypeAndFlags = MDP_TX;
request.out.src.sid = *sidp;
request.out.src.port=MDP_PORT_NOREPLY;
request.out.queue=OQ_ORDINARY;
request.out.dst.sid = directory_service->sid;
request.out.dst.port=MDP_PORT_DIRECTORY;
request.out.payload_length = snprintf((char *)request.out.payload, sizeof(request.out.payload),
"%s|%s", did, name);
// Used by tests
INFOF("Sending directory registration for %s*, %s, %s to %s*",
alloca_tohex_sid_t_trunc(*sidp, 14), did, name, alloca_tohex_sid_t_trunc(directory_service->sid, 14));
overlay_mdp_dispatch(&request, NULL);
alloca_tohex_sid_t_trunc(source->sid, 14), did, name, alloca_tohex_sid_t_trunc(directory_service->sid, 14));
struct internal_mdp_header header;
bzero(&header, sizeof header);
header.source = source;
header.source_port = MDP_PORT_NOREPLY;
header.destination = directory_service;
header.destination_port = MDP_PORT_DIRECTORY;
header.qos = OQ_ORDINARY;
char buff[256];
struct overlay_buffer *payload = ob_static((unsigned char*)buff, sizeof buff);
ob_limitsize(payload, snprintf(buff, sizeof buff, "%s|%s", did, name));
overlay_send_frame(&header, payload);
ob_free(payload);
}
// send a registration packet for each unlocked identity
@ -78,18 +79,10 @@ static void directory_send_keyring(struct subscriber *directory_service){
unsigned cn=0, in=0, kp=0;
for (; !keyring_sanitise_position(keyring, &cn, &in, &kp); ++kp){
keyring_identity *i = keyring->contexts[cn]->identities[in];
if (i->keypairs[kp]->type == KEYTYPE_CRYPTOBOX){
const sid_t *sidp = (const sid_t *) i->keypairs[0]->public_key;
unsigned k2;
for(k2=0; k2 < i->keypair_count; k2++){
if (i->keypairs[k2]->type==KEYTYPE_DID){
const char *unpackedDid = (const char *) i->keypairs[k2]->private_key;
const char *name = (const char *) i->keypairs[k2]->public_key;
directory_send(directory_service, sidp, unpackedDid, name);
// send the first DID only
break;
}
}
if (i->subscriber && i->keypairs[kp]->type == KEYTYPE_DID){
const char *unpackedDid = (const char *) i->keypairs[kp]->private_key;
const char *name = (const char *) i->keypairs[kp]->public_key;
directory_send(directory_service, i->subscriber, unpackedDid, name);
}
}
}

View File

@ -104,15 +104,15 @@ static void add_record(int mdp_sockfd){
}
// make sure the payload is a NULL terminated string
mdp.in.payload[mdp.in.payload_length]=0;
mdp.out.payload[mdp.out.payload_length]=0;
char *did=(char *)mdp.in.payload;
char *did=(char *)mdp.out.payload;
int i=0;
while(i<mdp.in.payload_length && mdp.in.payload[i] && mdp.in.payload[i]!='|')
while(i<mdp.out.payload_length && mdp.out.payload[i] && mdp.out.payload[i]!='|')
i++;
mdp.in.payload[i]=0;
char *name = (char *)mdp.in.payload+i+1;
char *sid = alloca_tohex_sid_t(mdp.in.src.sid);
mdp.out.payload[i]=0;
char *name = (char *)mdp.out.payload+i+1;
char *sid = alloca_tohex_sid_t(mdp.out.src.sid);
// TODO check that did is a valid phone number

View File

@ -467,7 +467,7 @@ void handle_reply_line(const char *bufp, size_t len)
else {
if (config.debug.dnahelper)
DEBUGF("DNAHELPER reply %s", alloca_toprint(-1, bufp, len));
overlay_mdp_dnalookup_reply(request_source, request_port, &my_subscriber->sid, uri, did, name);
overlay_mdp_dnalookup_reply(request_source, request_port, my_subscriber, uri, did, name);
}
}
} else {

View File

@ -272,8 +272,8 @@ int overlay_mdp_recv(int mdp_sockfd, overlay_mdp_frame *mdp, mdp_port_t port, in
return WHYF("reply did not come from server: %s", alloca_socket_address(&recvaddr));
// silently drop incoming packets for the wrong port number
if (port>0 && port != mdp->in.dst.port){
WARNF("Ignoring packet for port %"PRImdp_port_t,mdp->in.dst.port);
if (port>0 && port != mdp->out.dst.port){
WARNF("Ignoring packet for port %"PRImdp_port_t,mdp->out.dst.port);
return -1;
}
@ -349,7 +349,10 @@ ssize_t overlay_mdp_relevant_bytes(overlay_mdp_frame *mdp)
len=(&mdp->out.payload[0]-(unsigned char *)mdp) + mdp->out.payload_length;
break;
case MDP_BIND:
len=(&mdp->raw[0] - (char *)mdp) + sizeof(sockaddr_mdp);
// make sure that the compiler has actually given these two structures the same address
// I've seen gcc 4.8.1 on x64 fail to give elements the same address once
assert((void *)mdp->raw == (void *)&mdp->bind);
len=(&mdp->raw[0] - (char *)mdp) + sizeof(struct mdp_sockaddr);
break;
case MDP_SCAN:
len=(&mdp->raw[0] - (char *)mdp) + sizeof(struct overlay_mdp_scan);

View File

@ -72,8 +72,6 @@ struct mdp_identity_request{
// the request is followed by a list of SID's or NULL terminated entry pins for the remainder of the payload
};
#pragma pack(pop)
struct overlay_route_record{
sid_t sid;
char interface_name[256];
@ -85,6 +83,45 @@ struct overlay_mdp_scan{
struct in_addr addr;
};
struct overlay_mdp_data_frame {
struct mdp_sockaddr src;
struct mdp_sockaddr dst;
uint16_t payload_length;
int queue;
int ttl;
unsigned char payload[MDP_MTU-100];
};
struct overlay_mdp_error {
unsigned int error;
char message[128];
};
struct overlay_mdp_addrlist {
int mode;
#define OVERLAY_MDP_ADDRLIST_MAX_SID_COUNT (~(unsigned int)0)
unsigned int server_sid_count;
unsigned int first_sid;
unsigned int last_sid;
unsigned int frame_sid_count; /* how many of the following slots are populated */
sid_t sids[MDP_MAX_SID_REQUEST];
};
typedef struct overlay_mdp_frame {
uint16_t packetTypeAndFlags;
union {
struct overlay_mdp_data_frame out;
struct mdp_sockaddr bind;
struct overlay_mdp_addrlist addrlist;
struct overlay_mdp_error error;
char raw[MDP_MTU];
};
} overlay_mdp_frame;
#pragma pack(pop)
/* low level V2 mdp interface */
int mdp_socket(void);
int mdp_close(int socket);
@ -93,6 +130,7 @@ ssize_t mdp_recv(int socket, struct mdp_header *header, uint8_t *payload, ssize_
int mdp_poll(int socket, time_ms_t timeout_ms);
/* Client-side MDP function */
int overlay_mdp_client_socket(void);
int overlay_mdp_client_close(int mdp_sockfd);

View File

@ -474,7 +474,7 @@ static int monitor_lookup_match(const struct cli_parsed *parsed, struct cli_cont
char uri[256];
snprintf(uri, sizeof(uri), "sid://%s/external/%s", alloca_tohex_sid_t(my_subscriber->sid), ext);
DEBUGF("Sending response to %s for %s", sid, uri);
overlay_mdp_dnalookup_reply(destination, dest_port, &my_subscriber->sid, uri, ext, name);
overlay_mdp_dnalookup_reply(destination, dest_port, my_subscriber, uri, ext, name);
return 0;
}

View File

@ -236,6 +236,8 @@ int overlay_send_probe(struct subscriber *peer, struct network_destination *dest
if (destination->last_tx + destination->tick_ms > now)
return -1;
// TODO enhance overlay_send_frame to support pre-supplied network destinations
struct overlay_frame *frame=malloc(sizeof(struct overlay_frame));
bzero(frame,sizeof(struct overlay_frame));
frame->type=OF_TYPE_DATA;
@ -249,7 +251,6 @@ int overlay_send_probe(struct subscriber *peer, struct network_destination *dest
return -1;
}
frame->source_full = 1;
// TODO call mdp payload encryption / signing without calling overlay_mdp_dispatch...
overlay_mdp_encode_ports(frame->payload, MDP_PORT_ECHO, MDP_PORT_PROBE);
@ -291,17 +292,14 @@ int overlay_mdp_service_stun_req(struct internal_mdp_header *header, struct over
if (config.debug.overlayrouting)
DEBUGF("Processing STUN request from %s", alloca_tohex_sid_t(header->source->sid));
overlay_mdp_frame reply;
bzero(&reply, sizeof(reply));
reply.packetTypeAndFlags=MDP_TX;
struct internal_mdp_header reply;
bzero(&reply, sizeof reply);
reply.out.dst.sid = header->source->sid;
reply.out.src.sid = header->destination?header->destination->sid:my_subscriber->sid;
reply.out.src.port=MDP_PORT_STUNREQ;
reply.out.dst.port=MDP_PORT_STUN;
reply.out.queue=OQ_MESH_MANAGEMENT;
mdp_init_response(header, &reply);
reply.qos = OQ_MESH_MANAGEMENT;
struct overlay_buffer *replypayload = ob_static(reply.out.payload, sizeof(reply.out.payload));
struct overlay_buffer *replypayload = ob_new();
ob_limitsize(replypayload, MDP_MTU);
ob_checkpoint(replypayload);
while (ob_remaining(payload) > 0) {
@ -319,12 +317,12 @@ int overlay_mdp_service_stun_req(struct internal_mdp_header *header, struct over
ob_checkpoint(replypayload);
}
ob_rewind(replypayload);
reply.out.payload_length=ob_position(replypayload);
if (reply.out.payload_length){
if (ob_position(replypayload)){
if (config.debug.overlayrouting)
DEBUGF("Sending reply");
overlay_mdp_dispatch(&reply, NULL);
ob_flip(replypayload);
overlay_send_frame(&reply, replypayload);
}
ob_free(replypayload);
return 0;
@ -376,23 +374,25 @@ int overlay_send_stun_request(struct subscriber *server, struct subscriber *requ
request->last_stun_request=now;
overlay_mdp_frame mdp;
bzero(&mdp, sizeof(mdp));
mdp.packetTypeAndFlags=MDP_TX;
struct internal_mdp_header header;
bzero(&header, sizeof header);
header.source = my_subscriber;
header.destination = server;
mdp.out.src.sid = my_subscriber->sid;
mdp.out.dst.sid = server->sid;
mdp.out.src.port=MDP_PORT_STUN;
mdp.out.dst.port=MDP_PORT_STUNREQ;
mdp.out.queue=OQ_MESH_MANAGEMENT;
header.source_port = MDP_PORT_STUN;
header.destination_port = MDP_PORT_STUNREQ;
header.qos = OQ_MESH_MANAGEMENT;
struct overlay_buffer *payload = ob_new();
ob_limitsize(payload, MDP_MTU);
struct overlay_buffer *payload = ob_static(mdp.out.payload, sizeof(mdp.out.payload));
overlay_address_append(NULL, payload, request);
if (!ob_overrun(payload)) {
mdp.out.payload_length=ob_position(payload);
if (config.debug.overlayrouting)
DEBUGF("Sending STUN request to %s", alloca_tohex_sid_t(server->sid));
overlay_mdp_dispatch(&mdp, NULL);
ob_flip(payload);
overlay_send_frame(&header, payload);
}
ob_free(payload);
return 0;

View File

@ -114,7 +114,7 @@ void overlay_mdp_clean_socket_files()
closedir(dir);
}
void overlay_mdp_fill_legacy(
static void overlay_mdp_fill_legacy(
const struct internal_mdp_header *header,
struct overlay_buffer *payload,
overlay_mdp_frame *mdp)
@ -485,8 +485,8 @@ int overlay_saw_mdp_containing_frame(struct overlay_frame *f)
struct internal_mdp_header header;
bzero(&header, sizeof header);
header.qos = mdp.in.queue = f->queue;
header.ttl = mdp.in.ttl = f->ttl;
header.qos = mdp.out.queue = f->queue;
header.ttl = mdp.out.ttl = f->ttl;
header.source = f->source;
header.destination = f->destination;
header.receive_interface = f->interface;
@ -497,8 +497,8 @@ int overlay_saw_mdp_containing_frame(struct overlay_frame *f)
header.crypt_flags |= MDP_FLAG_NO_SIGN;
/* Get source and destination addresses */
mdp.in.dst.sid = (f->destination) ? f->destination->sid : SID_BROADCAST;
mdp.in.src.sid = f->source->sid;
mdp.out.dst.sid = (f->destination) ? f->destination->sid : SID_BROADCAST;
mdp.out.src.sid = f->source->sid;
/* copy crypto flags from frame so that we know if we need to decrypt or verify it */
struct overlay_buffer *mdp_payload = overlay_mdp_decrypt(&header, f->payload);
@ -563,6 +563,7 @@ static int overlay_saw_mdp_frame(
case 0:
{
overlay_mdp_frame mdp;
bzero(&mdp, sizeof mdp);
ob_checkpoint(payload);
overlay_mdp_fill_legacy(header, payload, &mdp);
ob_rewind(payload);
@ -623,33 +624,36 @@ static int overlay_saw_mdp_frame(
}
int overlay_mdp_dnalookup_reply(struct subscriber *dest, mdp_port_t dest_port,
const sid_t *resolved_sidp, const char *uri, const char *did, const char *name)
struct subscriber *resolved_sid, const char *uri, const char *did, const char *name)
{
if (config.debug.mdprequests)
DEBUGF("MDP_PORT_DNALOOKUP resolved_sid=%s uri=%s did=%s name=%s",
alloca_tohex_sid_t(*resolved_sidp),
alloca_tohex_sid_t(resolved_sid->sid),
alloca_str_toprint(uri),
alloca_str_toprint(did),
alloca_str_toprint(name)
);
overlay_mdp_frame mdpreply;
bzero(&mdpreply, sizeof mdpreply);
mdpreply.packetTypeAndFlags = MDP_TX; // outgoing MDP message
mdpreply.out.queue=OQ_ORDINARY;
mdpreply.out.src.sid = *resolved_sidp;
mdpreply.out.src.port = MDP_PORT_DNALOOKUP;
mdpreply.out.dst.sid = dest->sid;
mdpreply.out.dst.port = dest_port;
struct internal_mdp_header header;
bzero(&header, sizeof header);
header.qos = OQ_ORDINARY;
header.source = resolved_sid;
header.source_port = MDP_PORT_DNALOOKUP;
header.destination = dest;
header.destination_port = dest_port;
/* build reply as TOKEN|URI|DID|NAME|<NUL> */
strbuf b = strbuf_local((char *)mdpreply.out.payload, sizeof mdpreply.out.payload);
strbuf_tohex(b, SID_STRLEN, resolved_sidp->binary);
char buff[256];
strbuf b = strbuf_local(buff, sizeof buff);
strbuf_tohex(b, SID_STRLEN, resolved_sid->sid.binary);
strbuf_sprintf(b, "|%s|%s|%s|", uri, did, name?name:"");
if (strbuf_overrun(b))
return WHY("MDP payload overrun");
mdpreply.out.payload_length = strbuf_len(b) + 1;
/* deliver reply */
return overlay_mdp_dispatch(&mdpreply, NULL);
struct overlay_buffer *payload = ob_static((unsigned char*)buff, sizeof buff);
ob_limitsize(payload, strlen(buff));
int ret = overlay_send_frame(&header, payload);
ob_free(payload);
return ret;
}
static int overlay_mdp_check_binding(struct subscriber *subscriber, mdp_port_t port,
@ -912,7 +916,7 @@ int overlay_send_frame(struct internal_mdp_header *header,
This is for use by the SERVER.
Clients should use overlay_mdp_send()
*/
int overlay_mdp_dispatch(overlay_mdp_frame *mdp, struct socket_address *client)
static int overlay_mdp_dispatch(overlay_mdp_frame *mdp, struct socket_address *client)
{
IN();
unsigned __d = 0;
@ -1003,7 +1007,7 @@ int overlay_mdp_dispatch(overlay_mdp_frame *mdp, struct socket_address *client)
}
static int search_subscribers(struct subscriber *subscriber, void *context){
overlay_mdp_addrlist *response = context;
struct overlay_mdp_addrlist *response = context;
if (response->mode == MDP_ADDRLIST_MODE_SELF && subscriber->reachable != REACHABLE_SELF){
return 0;
@ -1026,7 +1030,7 @@ static int search_subscribers(struct subscriber *subscriber, void *context){
return 0;
}
int overlay_mdp_address_list(overlay_mdp_addrlist *request, overlay_mdp_addrlist *response)
int overlay_mdp_address_list(struct overlay_mdp_addrlist *request, struct overlay_mdp_addrlist *response)
{
if (config.debug.mdprequests)
DEBUGF("MDP_GETADDRS first_sid=%u mode=%d", request->first_sid, request->mode);

View File

@ -193,22 +193,24 @@ int overlay_mdp_service_dnalookup(struct internal_mdp_header *header, struct ove
int results=0;
while(keyring_find_did(keyring,&cn,&in,&kp,did))
{
struct keypair *keypair = keyring->contexts[cn]->identities[in]->keypairs[kp];
/* package DID and Name into reply (we include the DID because
it could be a wild-card DID search, but the SID is implied
in the source address of our reply). */
if (keyring->contexts[cn]->identities[in]->keypairs[kp]->private_key_len > DID_MAXSIZE)
if (keypair->private_key_len > DID_MAXSIZE)
/* skip excessively long DID records */
continue;
const sid_t *sidp = (const sid_t *) keyring->contexts[cn]->identities[in]->keypairs[0]->public_key;
const char *unpackedDid = (const char *) keyring->contexts[cn]->identities[in]->keypairs[kp]->private_key;
const char *name = (const char *)keyring->contexts[cn]->identities[in]->keypairs[kp]->public_key;
struct subscriber *subscriber = keyring->contexts[cn]->identities[in]->subscriber;
const char *unpackedDid = (const char *) keypair->private_key;
const char *name = (const char *)keypair->public_key;
// URI is sid://SIDHEX/DID
strbuf b = strbuf_alloca(SID_STRLEN + DID_MAXSIZE + 10);
strbuf_puts(b, "sid://");
strbuf_tohex(b, SID_STRLEN, sidp->binary);
strbuf_tohex(b, SID_STRLEN, subscriber->sid.binary);
strbuf_puts(b, "/local/");
strbuf_puts(b, unpackedDid);
overlay_mdp_dnalookup_reply(header->source, header->source_port, sidp, strbuf_str(b), unpackedDid, name);
overlay_mdp_dnalookup_reply(header->source, header->source_port, subscriber, strbuf_str(b), unpackedDid, name);
kp++;
results++;
}

View File

@ -701,7 +701,7 @@ void rhizome_list_release(struct rhizome_list_cursor *);
#define MAX_RHIZOME_MANIFESTS 40
#define MAX_CANDIDATES 32
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct socket_address *addr, const sid_t *peersidp);
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct socket_address *addr, const struct subscriber *peer);
rhizome_manifest * rhizome_fetch_search(const unsigned char *id, int prefix_length);
/* Rhizome file storage api */
@ -885,7 +885,7 @@ enum rhizome_start_fetch_result {
enum rhizome_start_fetch_result
rhizome_fetch_request_manifest_by_prefix(const struct socket_address *addr,
const sid_t *peersidp,
const struct subscriber *peer,
const unsigned char *prefix, size_t prefix_length);
int rhizome_any_fetch_active();
int rhizome_any_fetch_queued();

View File

@ -476,8 +476,6 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
DEBUGF("Dispatch size_high=%"PRId64,r->cursor->size_high);
rhizome_direct_transport_state_http *state = r->transport_specific_state;
sid_t zerosid = SID_ANY;
int sock=socket(AF_INET, SOCK_STREAM, 0);
if (sock==-1) {
WHY_perror("socket");
@ -636,7 +634,7 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
*/
if (config.debug.rhizome_tx)
DEBUGF("Fetching manifest %s* @ 0x%x",alloca_tohex(&actionlist[i], 1+RHIZOME_BAR_PREFIX_BYTES),i);
if (!rhizome_fetch_request_manifest_by_prefix(&addr, &zerosid, &actionlist[i+1], RHIZOME_BAR_PREFIX_BYTES))
if (!rhizome_fetch_request_manifest_by_prefix(&addr, NULL, &actionlist[i+1], RHIZOME_BAR_PREFIX_BYTES))
{
/* Fetching the manifest, and then using it to see if we want to
fetch the file for import is all handled asynchronously, so just

View File

@ -27,6 +27,8 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "str.h"
#include "strbuf_helpers.h"
#include "overlay_address.h"
#include "overlay_packet.h"
#include "overlay_buffer.h"
#include "socket.h"
#include "dataformats.h"
@ -39,7 +41,7 @@ struct rhizome_fetch_candidate {
Can be either IP+port for HTTP or it can be a SID
for MDP. */
struct socket_address addr;
sid_t peer_sid;
const struct subscriber *peer;
int priority;
};
@ -52,7 +54,7 @@ struct rhizome_fetch_slot {
rhizome_manifest *manifest;
struct socket_address addr;
sid_t peer_sid;
const struct subscriber *peer;
int state;
#define RHIZOME_FETCH_FREE 0
@ -216,7 +218,7 @@ int rhizome_fetch_status_html(strbuf b)
fetch_state(q->active.state),
q->active.write_state.file_offset,
q->active.manifest->filesize,
alloca_tohex_sid_t_trunc(q->active.peer_sid, 16));
q->active.peer?alloca_tohex_sid_t_trunc(q->active.peer->sid, 16):"unknown");
}else{
strbuf_puts(b, "inactive");
}
@ -612,7 +614,7 @@ schedule_fetch(struct rhizome_fetch_slot *slot)
if (config.debug.rhizome_rx)
DEBUGF("RHIZOME HTTP REQUEST addr=%s sid=%s %s",
alloca_socket_address(&slot->addr),
alloca_tohex_sid_t(slot->peer_sid),
slot->peer?alloca_tohex_sid_t(slot->peer->sid):"unknown",
alloca_str_toprint(slot->request)
);
slot->alarm.poll.fd = sock;
@ -677,7 +679,7 @@ schedule_fetch(struct rhizome_fetch_slot *slot)
*/
static enum rhizome_start_fetch_result
rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m,
const struct socket_address *addr, const sid_t *peersidp)
const struct socket_address *addr, const struct subscriber *peer)
{
IN();
if (slot->state != RHIZOME_FETCH_FREE)
@ -761,7 +763,7 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m,
/* Prepare for fetching */
slot->addr = *addr;
slot->peer_sid = *peersidp;
slot->peer = peer;
slot->manifest = m;
enum rhizome_start_fetch_result result = schedule_fetch(slot);
@ -781,7 +783,7 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m,
*/
enum rhizome_start_fetch_result
rhizome_fetch_request_manifest_by_prefix(const struct socket_address *addr,
const sid_t *peersidp,
const struct subscriber *peer,
const unsigned char *prefix, size_t prefix_length)
{
assert(addr);
@ -792,7 +794,7 @@ rhizome_fetch_request_manifest_by_prefix(const struct socket_address *addr,
/* Prepare for fetching via HTTP */
slot->addr = *addr;
slot->manifest = NULL;
slot->peer_sid = *peersidp;
slot->peer = peer;
bcopy(prefix, slot->bid.binary, prefix_length);
slot->prefix_length=prefix_length;
@ -817,7 +819,7 @@ static void rhizome_start_next_queued_fetch(struct rhizome_fetch_slot *slot)
unsigned i = 0;
struct rhizome_fetch_candidate *c;
while (i < q->candidate_queue_size && (c = &q->candidate_queue[i])->manifest) {
int result = rhizome_fetch(slot, c->manifest, &c->addr, &c->peer_sid);
int result = rhizome_fetch(slot, c->manifest, &c->addr, c->peer);
switch (result) {
case SLOTBUSY:
OUT(); return;
@ -890,7 +892,7 @@ int rhizome_fetch_has_queue_space(unsigned char log2_size){
*
* @author Andrew Bettison <andrew@servalproject.com>
*/
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct socket_address *addr, const sid_t *peersidp)
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct socket_address *addr, const struct subscriber *peer)
{
IN();
@ -994,7 +996,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
c->manifest = m;
c->priority = priority;
c->addr = *addr;
c->peer_sid = *peersidp;
c->peer = peer;
if (config.debug.rhizome_rx) {
DEBUG("Rhizome fetch queues:");
@ -1109,20 +1111,19 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot)
// faster. Optimising behaviour when there is no packet loss is an
// outstanding task.
overlay_mdp_frame mdp;
bzero(&mdp,sizeof(mdp));
mdp.out.src.sid = my_subscriber->sid;
mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE;
mdp.out.dst.sid = slot->peer_sid;
mdp.out.dst.port=MDP_PORT_RHIZOME_REQUEST;
mdp.out.ttl=1;
mdp.packetTypeAndFlags=MDP_TX;
mdp.out.queue=OQ_ORDINARY;
mdp.out.payload_length= sizeof slot->bid.binary + 8 + 8 + 4 + 2;
bcopy(slot->bid.binary, &mdp.out.payload[0], sizeof slot->bid.binary);
struct internal_mdp_header header;
bzero(&header, sizeof header);
header.source = my_subscriber;
header.source_port = MDP_PORT_RHIZOME_RESPONSE;
header.destination = (struct subscriber *)slot->peer;
header.destination_port = MDP_PORT_RHIZOME_REQUEST;
header.ttl = 1;
header.qos = OQ_ORDINARY;
struct overlay_buffer *payload = ob_new();
ob_append_bytes(payload, slot->bid.binary, sizeof slot->bid.binary);
uint32_t bitmap=0;
int requests=32;
int i;
@ -1139,20 +1140,22 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot)
}
offset+=slot->mdpRXBlockLength;
}
write_uint64(&mdp.out.payload[sizeof slot->bid.binary], slot->bidVersion);
write_uint64(&mdp.out.payload[sizeof slot->bid.binary + 8], slot->write_state.file_offset);
write_uint32(&mdp.out.payload[sizeof slot->bid.binary + 8 + 8], bitmap);
write_uint16(&mdp.out.payload[sizeof slot->bid.binary + 8 + 8 + 4], slot->mdpRXBlockLength);
ob_append_ui64_rv(payload, slot->bidVersion);
ob_append_ui64_rv(payload, slot->write_state.file_offset);
ob_append_ui32_rv(payload, bitmap);
ob_append_ui16_rv(payload, slot->mdpRXBlockLength);
if (config.debug.rhizome_tx)
DEBUGF("src sid=%s, dst sid=%s, mdpRXWindowStart=0x%"PRIx64", slot->bidVersion=0x%"PRIx64,
alloca_tohex_sid_t(mdp.out.src.sid),
alloca_tohex_sid_t(mdp.out.dst.sid),
alloca_tohex_sid_t(header.source->sid),
alloca_tohex_sid_t(header.destination->sid),
slot->write_state.file_offset,
slot->bidVersion);
overlay_mdp_dispatch(&mdp, NULL);
ob_flip(payload);
overlay_send_frame(&header, payload);
ob_free(payload);
// remember when we sent the request so that we can adjust the inter-request
// interval based on how fast the packets arrive.
@ -1330,7 +1333,7 @@ static int rhizome_write_complete(struct rhizome_fetch_slot *slot)
alloca_tohex_rhizome_filehash_t(slot->manifest->filehash));
} else {
INFOF("Completed MDP request from %s for file %s",
alloca_tohex_sid_t(slot->peer_sid),
slot->peer?alloca_tohex_sid_t(slot->peer->sid):"unknown",
alloca_tohex_rhizome_filehash_t(slot->manifest->filehash));
}
} else {
@ -1353,9 +1356,9 @@ static int rhizome_write_complete(struct rhizome_fetch_slot *slot)
DEBUGF("All looks good for importing manifest id=%s, addr=%s, sid=%s",
alloca_tohex_rhizome_bid_t(m->cryptoSignPublic),
alloca_socket_address(&slot->addr),
alloca_tohex_sid_t(slot->peer_sid));
slot->peer?alloca_tohex_sid_t(slot->peer->sid):"unknown");
}
rhizome_suggest_queue_manifest_import(m, &slot->addr, &slot->peer_sid);
rhizome_suggest_queue_manifest_import(m, &slot->addr, slot->peer);
}
}
}

View File

@ -383,7 +383,7 @@ int overlay_rhizome_saw_advertisements(struct decode_context *context, struct ov
DEBUG("Not seen before.");
// start the fetch process!
rhizome_suggest_queue_manifest_import(m, &httpaddr, &f->source->sid);
rhizome_suggest_queue_manifest_import(m, &httpaddr, f->source);
// the above function will free the manifest structure, make sure we don't free it again
m=NULL;
@ -399,10 +399,10 @@ next:
if (f->source->sync_state)
goto end;
overlay_mdp_frame mdp;
struct internal_mdp_header header;
bzero(&header, sizeof header);
bzero(&mdp,sizeof(mdp));
mdp.out.payload_length=0;
struct overlay_buffer *payload = NULL;
// parse BAR's
unsigned char *bars[50];
@ -450,22 +450,24 @@ next:
test_count++;
if (rhizome_is_bar_interesting(bars[index])==1){
// add a request for the manifest
if (mdp.out.payload_length==0){
mdp.out.src.sid = my_subscriber->sid;
mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE;
mdp.out.dst.sid = f->source->sid;
mdp.out.dst.port=MDP_PORT_RHIZOME_MANIFEST_REQUEST;
if (f->source->reachable&REACHABLE_DIRECT)
mdp.out.ttl=1;
else
mdp.out.ttl=64;
mdp.packetTypeAndFlags=MDP_TX;
if (!payload){
header.source = my_subscriber;
header.source_port = MDP_PORT_RHIZOME_RESPONSE;
header.destination = f->source;
header.destination_port = MDP_PORT_RHIZOME_MANIFEST_REQUEST;
mdp.out.queue=OQ_ORDINARY;
if (f->source->reachable&REACHABLE_DIRECT)
header.ttl=1;
else
header.ttl=64;
header.qos=OQ_ORDINARY;
payload = ob_new();
}
DEBUGF("Requesting manifest for BAR %s", alloca_tohex(bars[index], RHIZOME_BAR_BYTES));
bcopy(bars[index], &mdp.out.payload[mdp.out.payload_length], RHIZOME_BAR_BYTES);
mdp.out.payload_length+=RHIZOME_BAR_BYTES;
if (config.debug.rhizome)
DEBUGF("Requesting manifest for BAR %s", alloca_tohex(bars[index], RHIZOME_BAR_BYTES));
ob_append_bytes(payload, bars[index], RHIZOME_BAR_BYTES);
}
}
@ -476,8 +478,11 @@ next:
else
lookup_time = (end_time - start_time);
if (mdp.out.payload_length>0)
overlay_mdp_dispatch(&mdp, NULL);
if (payload){
ob_flip(payload);
overlay_send_frame(&header, payload);
ob_free(payload);
}
end:
sqlite_set_tracefunc(oldfunc);

View File

@ -75,25 +75,25 @@ void rhizome_sync_status_html(struct strbuf *b, struct subscriber *subscriber)
static void rhizome_sync_request(struct subscriber *subscriber, uint64_t token, unsigned char forwards)
{
overlay_mdp_frame mdp;
bzero(&mdp,sizeof(mdp));
mdp.out.src.sid = my_subscriber->sid;
mdp.out.src.port=MDP_PORT_RHIZOME_SYNC;
mdp.out.dst.sid = subscriber->sid;
mdp.out.dst.port=MDP_PORT_RHIZOME_SYNC;
mdp.packetTypeAndFlags=MDP_TX;
mdp.out.queue=OQ_OPPORTUNISTIC;
struct overlay_buffer *b = ob_static(mdp.out.payload, sizeof(mdp.out.payload));
struct internal_mdp_header header;
bzero(&header, sizeof header);
header.source = my_subscriber;
header.source_port = MDP_PORT_RHIZOME_SYNC;
header.destination = subscriber;
header.destination_port = MDP_PORT_RHIZOME_SYNC;
header.qos = OQ_OPPORTUNISTIC;
struct overlay_buffer *b = ob_new();
ob_append_byte(b, MSG_TYPE_REQ);
ob_append_byte(b, forwards);
ob_append_packed_ui64(b, token);
mdp.out.payload_length = ob_position(b);
if (config.debug.rhizome)
DEBUGF("Sending request to %s for BARs from %"PRIu64" %s", alloca_tohex_sid_t(subscriber->sid), token, forwards?"forwards":"backwards");
overlay_mdp_dispatch(&mdp, NULL);
ob_flip(b);
overlay_send_frame(&header, b);
ob_free(b);
}
@ -103,9 +103,10 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi
time_ms_t now = gettime_ms();
// send requests for manifests that we have room to fetch
overlay_mdp_frame mdp;
bzero(&mdp,sizeof(mdp));
struct internal_mdp_header header;
bzero(&header, sizeof header);
struct overlay_buffer *payload = NULL;
for (i=0;i < state->bar_count;i++){
if (state->bars[i].next_request > now)
continue;
@ -126,28 +127,35 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi
if (m && m->version >= version)
continue;
if (mdp.out.payload_length==0){
mdp.out.src.sid = my_subscriber->sid;
mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE;
mdp.out.dst.sid = subscriber->sid;
mdp.out.dst.port=MDP_PORT_RHIZOME_MANIFEST_REQUEST;
mdp.packetTypeAndFlags=MDP_TX;
mdp.out.queue=OQ_OPPORTUNISTIC;
if (!payload){
header.source = my_subscriber;
header.source_port = MDP_PORT_RHIZOME_RESPONSE;
header.destination = subscriber;
header.destination_port = MDP_PORT_RHIZOME_MANIFEST_REQUEST;
header.qos = OQ_OPPORTUNISTIC;
payload = ob_new();
ob_limitsize(payload, MDP_MTU);
}
if (mdp.out.payload_length + RHIZOME_BAR_BYTES>MDP_MTU)
if (ob_remaining(payload)<RHIZOME_BAR_BYTES)
break;
if (config.debug.rhizome)
DEBUGF("Requesting manifest for BAR %s", alloca_tohex(state->bars[i].bar, RHIZOME_BAR_BYTES));
bcopy(state->bars[i].bar, &mdp.out.payload[mdp.out.payload_length], RHIZOME_BAR_BYTES);
mdp.out.payload_length+=RHIZOME_BAR_BYTES;
ob_append_bytes(payload, state->bars[i].bar, RHIZOME_BAR_BYTES);
state->bars[i].next_request = now+1000;
requests++;
if (requests>=BARS_PER_RESPONSE)
break;
}
if (mdp.out.payload_length!=0)
overlay_mdp_dispatch(&mdp, NULL);
if (payload){
ob_flip(payload);
overlay_send_frame(&header, payload);
ob_free(payload);
}
// send request for more bars if we have room to cache them
if (state->bar_count >= CACHE_BARS)
@ -340,29 +348,20 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t
if (max_count == 0 || max_count > BARS_PER_RESPONSE)
max_count = BARS_PER_RESPONSE;
overlay_mdp_frame mdp;
bzero(&mdp,sizeof(mdp));
mdp.out.src.sid = my_subscriber->sid;
mdp.out.src.port=MDP_PORT_RHIZOME_SYNC;
mdp.out.dst.port=MDP_PORT_RHIZOME_SYNC;
mdp.packetTypeAndFlags=MDP_TX;
mdp.out.queue=OQ_OPPORTUNISTIC;
if (dest){
mdp.out.dst.sid = dest->sid;
}else{
mdp.out.dst.sid = SID_BROADCAST;
mdp.packetTypeAndFlags|=(MDP_NOCRYPT|MDP_NOSIGN);
struct internal_mdp_header header;
bzero(&header, sizeof header);
header.source = my_subscriber;
header.source_port = MDP_PORT_RHIZOME_SYNC;
header.destination = dest;
header.destination_port = MDP_PORT_RHIZOME_SYNC;
header.qos = OQ_OPPORTUNISTIC;
if (!dest){
header.crypt_flags = (MDP_FLAG_NO_CRYPT|MDP_FLAG_NO_SIGN);
header.ttl = 1;
}
if (!dest)
mdp.out.ttl=1;
struct overlay_buffer *b = ob_static(mdp.out.payload, sizeof(mdp.out.payload));
ob_append_byte(b, MSG_TYPE_BARS);
ob_checkpoint(b);
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement;
if (forwards){
@ -378,6 +377,11 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t
int count=0;
uint64_t last=0;
struct overlay_buffer *b = ob_new();
ob_limitsize(b, MDP_MTU);
ob_append_byte(b, MSG_TYPE_BARS);
ob_checkpoint(b);
while(sqlite_step_retry(&retry, statement)==SQLITE_ROW){
uint64_t rowid = sqlite3_column_int64(statement, 0);
const unsigned char *bar = sqlite3_column_blob(statement, 1);
@ -438,10 +442,10 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t
sqlite3_finalize(statement);
if (count){
mdp.out.payload_length = ob_position(b);
if (config.debug.rhizome_ads)
DEBUGF("Sending %d BARs from %"PRIu64" to %"PRIu64, count, token, last);
overlay_mdp_dispatch(&mdp, NULL);
ob_flip(b);
overlay_send_frame(&header, b);
}
ob_free(b);
OUT();

View File

@ -358,74 +358,14 @@ int parseCommandLine(struct cli_context *context, const char *argv0, int argc, c
typedef uint32_t mdp_port_t;
#define PRImdp_port_t "#08" PRIx32
typedef struct sockaddr_mdp {
sid_t sid;
mdp_port_t port;
} sockaddr_mdp;
typedef struct overlay_mdp_data_frame {
sockaddr_mdp src;
sockaddr_mdp dst;
uint16_t payload_length;
int queue;
int ttl;
unsigned char payload[MDP_MTU-100];
} overlay_mdp_data_frame;
typedef struct overlay_mdp_error {
unsigned int error;
char message[128];
} overlay_mdp_error;
typedef struct overlay_mdp_addrlist {
int mode;
#define OVERLAY_MDP_ADDRLIST_MAX_SID_COUNT (~(unsigned int)0)
unsigned int server_sid_count;
unsigned int first_sid;
unsigned int last_sid;
unsigned int frame_sid_count; /* how many of the following 59 slots are populated */
sid_t sids[MDP_MAX_SID_REQUEST];
} overlay_mdp_addrlist;
typedef struct overlay_mdp_nodeinfo {
sid_t sid;
int sid_prefix_length; /* must be long enough to be unique */
int foundP;
int localP;
int neighbourP;
int score;
int interface_number;
time_ms_t time_since_last_observation;
} overlay_mdp_nodeinfo;
typedef struct overlay_mdp_frame {
uint16_t packetTypeAndFlags;
union {
overlay_mdp_data_frame out;
overlay_mdp_data_frame in;
sockaddr_mdp bind;
overlay_mdp_addrlist addrlist;
overlay_mdp_nodeinfo nodeinfo;
overlay_mdp_error error;
/* 2048 is too large (causes EMSGSIZE errors on OSX, but probably fine on
Linux) */
char raw[MDP_MTU];
};
} overlay_mdp_frame;
/* Server-side MDP functions */
void mdp_init_response(const struct internal_mdp_header *in, struct internal_mdp_header *out);
int overlay_mdp_dispatch(overlay_mdp_frame *mdp, struct socket_address *client);
void overlay_mdp_encode_ports(struct overlay_buffer *plaintext, mdp_port_t dst_port, mdp_port_t src_port);
int overlay_mdp_dnalookup_reply(struct subscriber *dest, mdp_port_t dest_port,
const sid_t *resolved_sidp, const char *uri, const char *did, const char *name);
struct subscriber *resolved_sid, const char *uri, const char *did, const char *name);
int overlay_send_frame(struct internal_mdp_header *header, struct overlay_buffer *payload);
void overlay_mdp_fill_legacy(
const struct internal_mdp_header *header,
struct overlay_buffer *payload,
overlay_mdp_frame *mdp);
int mdp_bind_internal(struct subscriber *subscriber, mdp_port_t port,
int (*internal)(struct internal_mdp_header *header, struct overlay_buffer *payload));
int mdp_unbind_internal(struct subscriber *subscriber, mdp_port_t port,
@ -485,8 +425,6 @@ int server_probe(int *pid);
int dna_helper_start();
int dna_helper_shutdown();
int dna_helper_enqueue(struct subscriber *source, mdp_port_t source_port, const char *did);
int dna_return_resolution(overlay_mdp_frame *mdp, unsigned char *fromSid,
const char *did,const char *name,const char *uri);
int parseDnaReply(const char *buf, size_t len, char *token, char *did, char *name, char *uri, const char **bufp);
extern int sigPipeFlag;
extern int sigIoFlag;