Refactor mdp packet processing to avoid using struct overlay_mdp_frame

This commit is contained in:
Jeremy Lakeman 2014-01-23 16:31:56 +10:30
parent ff2188b248
commit 487df0408d
16 changed files with 478 additions and 343 deletions

View File

@ -1131,12 +1131,11 @@ int app_mdp_ping(const struct cli_parsed *parsed, struct cli_context *context)
for (; sigIntFlag==0 && (icount==0 || tx_count<icount); ) {
// send a ping packet
{
if (tx_count==0 || !(mdp_header.flags & MDP_FLAG_BIND)){
uint8_t payload[12];
int *seq=(int *)payload;
*seq=sequence_number;
write_uint64(&payload[4], gettime_ms());
int r = mdp_send(mdp_sockfd, &mdp_header, payload, sizeof(payload));
if (r<0){
WARN_perror("mdp_send");
@ -1167,7 +1166,7 @@ int app_mdp_ping(const struct cli_parsed *parsed, struct cli_context *context)
if (mdp_recv_header.flags & MDP_FLAG_ERROR){
WHY("Serval daemon reported an error, please check the log for more information");
break;
continue;
}
if (mdp_recv_header.flags & MDP_FLAG_BIND){

View File

@ -1861,7 +1861,7 @@ static int keyring_process_challenge(keyring_file *k, struct subscriber *subscri
return 0;
}
int keyring_mapping_request(keyring_file *k, struct overlay_frame *frame, overlay_mdp_frame *req)
int keyring_mapping_request(keyring_file *k, struct internal_mdp_header *header, overlay_mdp_frame *req)
{
if (!k) return WHY("keyring is null");
if (!req) return WHY("req is null");
@ -1881,21 +1881,21 @@ int keyring_mapping_request(keyring_file *k, struct overlay_frame *frame, overla
case UNLOCK_REQUEST:
{
int len = req->out.payload_length;
if (crypto_verify_message(frame->destination, req->out.payload, &len))
if (crypto_verify_message(header->destination, req->out.payload, &len))
return WHY("Signature check failed");
req->out.payload_length = len;
}
return keyring_send_challenge(frame->destination, frame->source);
return keyring_send_challenge(header->destination, header->source);
case UNLOCK_CHALLENGE:
return keyring_respond_challenge(frame->source, req);
return keyring_respond_challenge(header->source, req);
case UNLOCK_RESPONSE:
{
int len = req->out.payload_length;
if (crypto_verify_message(frame->destination, req->out.payload, &len))
if (crypto_verify_message(header->destination, req->out.payload, &len))
return WHY("Signature check failed");
req->out.payload_length = len;
}
return keyring_process_challenge(k, frame->destination, req);
return keyring_process_challenge(k, header->destination, req);
}
return WHY("Not implemented");
}

View File

@ -116,7 +116,7 @@ int keyring_dump(keyring_file *k, XPRINTF xpf, int include_secret);
unsigned char *keyring_get_nm_bytes(const sid_t *known_sidp, const sid_t *unknown_sidp);
int keyring_mapping_request(keyring_file *k, struct overlay_frame *frame, overlay_mdp_frame *req);
int keyring_mapping_request(keyring_file *k, struct internal_mdp_header *header, overlay_mdp_frame *req);
int keyring_send_unlock(struct subscriber *subscriber);
void keyring_release_subscriber(keyring_file *k, const sid_t *sid);

View File

@ -72,7 +72,6 @@ int mdp_send(int socket, const struct mdp_header *header, const uint8_t *payload
}
};
return send_message(socket, &addr, &data);
}

View File

@ -126,6 +126,8 @@ schedule(&_sched_##X); }
overlay_mdp_setup_sockets();
monitor_setup_sockets();
overlay_mdp_bind_internal_services();
olsr_init_socket();
/* Get rhizome server started BEFORE populating fd list so that

View File

@ -281,11 +281,6 @@ void _ob_append_bytes(struct __sourceloc __whence, struct overlay_buffer *b, con
b->position += count;
}
void _ob_append_buffer(struct __sourceloc __whence, struct overlay_buffer *b, struct overlay_buffer *s)
{
ob_append_bytes(b, s->bytes, s->position);
}
void _ob_append_ui16(struct __sourceloc __whence, struct overlay_buffer *b, uint16_t v)
{
const int bytes = 2;
@ -428,6 +423,21 @@ void ob_skip(struct overlay_buffer *b, unsigned n)
b->position += n;
}
// return a null terminated string pointer and advance past the string
const char *ob_get_str_ptr(struct overlay_buffer *b)
{
const char *ret = (const char*)(b->bytes + b->position);
off_t ofs=0;
while (test_offset(b, ofs)==0){
if (ret[ofs]=='\0'){
b->position+=ofs+1;
return ret;
}
ofs++;
}
return NULL;
}
int ob_get_bytes(struct overlay_buffer *b, unsigned char *buff, size_t len)
{
if (test_offset(b, len))
@ -458,6 +468,18 @@ uint32_t ob_get_ui32(struct overlay_buffer *b)
return ret;
}
uint32_t ob_get_ui32_rv(struct overlay_buffer *b)
{
if (test_offset(b, 4))
return 0xFFFFFFFF; // ... unsigned
uint32_t ret = b->bytes[b->position]
| b->bytes[b->position +1] << 8
| b->bytes[b->position +2] << 16
| b->bytes[b->position +3] << 24;
b->position+=4;
return ret;
}
uint64_t ob_get_ui64(struct overlay_buffer *b)
{
if (test_offset(b, 8))
@ -474,6 +496,22 @@ uint64_t ob_get_ui64(struct overlay_buffer *b)
return ret;
}
uint64_t ob_get_ui64_rv(struct overlay_buffer *b)
{
if (test_offset(b, 8))
return 0xFFFFFFFF; // ... unsigned
uint64_t ret = (uint64_t)b->bytes[b->position]
| (uint64_t)b->bytes[b->position +1] << 8
| (uint64_t)b->bytes[b->position +2] << 16
| (uint64_t)b->bytes[b->position +3] << 24
| (uint64_t)b->bytes[b->position +4] << 32
| (uint64_t)b->bytes[b->position +5] << 40
| (uint64_t)b->bytes[b->position +6] << 48
| (uint64_t)b->bytes[b->position +7] << 56;
b->position+=8;
return ret;
}
uint16_t ob_get_ui16(struct overlay_buffer *b)
{
if (test_offset(b, 2))
@ -484,6 +522,16 @@ uint16_t ob_get_ui16(struct overlay_buffer *b)
return ret;
}
uint16_t ob_get_ui16_rv(struct overlay_buffer *b)
{
if (test_offset(b, 2))
return 0xFFFF; // ... unsigned
uint16_t ret = b->bytes[b->position]
| b->bytes[b->position +1] << 8;
b->position+=2;
return ret;
}
uint32_t ob_get_packed_ui32(struct overlay_buffer *b)
{
uint32_t ret=0;
@ -580,6 +628,11 @@ unsigned char *ob_ptr(struct overlay_buffer *b)
return b->bytes;
}
unsigned char *ob_current_ptr(struct overlay_buffer *b)
{
return &b->bytes[b->position];
}
int asprintable(int c)
{
if (c<' ') return '.';

View File

@ -59,7 +59,6 @@ void _ob_patch_rfs(struct __sourceloc __whence, struct overlay_buffer *b);
void _ob_append_byte(struct __sourceloc whence, struct overlay_buffer *b,unsigned char byte);
void _ob_append_bytes(struct __sourceloc whence, struct overlay_buffer *b,const unsigned char *bytes, size_t count);
void _ob_append_buffer(struct __sourceloc whence, struct overlay_buffer *b,struct overlay_buffer *s);
unsigned char *_ob_append_space(struct __sourceloc whence, struct overlay_buffer *b, size_t count);
void _ob_append_ui16(struct __sourceloc whence, struct overlay_buffer *b, uint16_t v);
void _ob_append_ui32(struct __sourceloc whence, struct overlay_buffer *b, uint32_t v);
@ -85,7 +84,6 @@ void _ob_append_rfs(struct __sourceloc whence, struct overlay_buffer *b,int l);
#define ob_append_byte(b, byte) _ob_append_byte(__WHENCE__, b, byte)
#define ob_append_bytes(b, bytes, count) _ob_append_bytes(__WHENCE__, b, bytes, count)
#define ob_append_buffer(b, s) _ob_append_buffer(__WHENCE__, b, s)
#define ob_append_space(b, count) _ob_append_space(__WHENCE__, b, count)
#define ob_append_ui16(b, v) _ob_append_ui16(__WHENCE__, b, v)
#define ob_append_ui32(b, v) _ob_append_ui32(__WHENCE__, b, v)
@ -101,9 +99,13 @@ void ob_skip(struct overlay_buffer *b, unsigned n);
int ob_get(struct overlay_buffer *b);
int ob_get_bytes(struct overlay_buffer *b, unsigned char *buff, size_t len);
unsigned char * ob_get_bytes_ptr(struct overlay_buffer *b, size_t len);
const char *ob_get_str_ptr(struct overlay_buffer *b);
uint64_t ob_get_ui64(struct overlay_buffer *b);
uint32_t ob_get_ui32(struct overlay_buffer *b);
uint16_t ob_get_ui16(struct overlay_buffer *b);
uint64_t ob_get_ui64_rv(struct overlay_buffer *b);
uint32_t ob_get_ui32_rv(struct overlay_buffer *b);
uint16_t ob_get_ui16_rv(struct overlay_buffer *b);
int ob_dump(struct overlay_buffer *b,char *desc);
uint32_t ob_get_packed_ui32(struct overlay_buffer *b);
@ -114,7 +116,10 @@ size_t ob_position(struct overlay_buffer *b);
size_t ob_limit(struct overlay_buffer *b);
size_t ob_remaining(struct overlay_buffer *b);
int _ob_overrun(struct __sourceloc, struct overlay_buffer *b);
// get the raw pointer of the whole buffer
unsigned char* ob_ptr(struct overlay_buffer *b);
// get the raw pointer of the current position
unsigned char* ob_current_ptr(struct overlay_buffer *b);
#define ob_overrun(b) _ob_overrun(__WHENCE__, b)

View File

@ -201,7 +201,7 @@ int load_subscriber_address(struct subscriber *subscriber)
/* Collection of unicast echo responses to detect working links */
int
overlay_mdp_service_probe(struct overlay_frame *frame, overlay_mdp_frame *mdp)
overlay_mdp_service_probe(struct internal_mdp_header *header, overlay_mdp_frame *mdp)
{
IN();
if (mdp->out.src.port!=MDP_PORT_ECHO){
@ -209,7 +209,7 @@ overlay_mdp_service_probe(struct overlay_frame *frame, overlay_mdp_frame *mdp)
RETURN(-1);
}
if (frame->source->reachable == REACHABLE_SELF)
if (header->source->reachable == REACHABLE_SELF)
RETURN(0);
uint8_t interface = mdp->out.payload[0];
@ -221,7 +221,7 @@ overlay_mdp_service_probe(struct overlay_frame *frame, overlay_mdp_frame *mdp)
bcopy(&mdp->out.payload[1], &addr.addr, addr.addrlen);
RETURN(link_unicast_ack(frame->source, &overlay_interfaces[interface], &addr));
RETURN(link_unicast_ack(header->source, &overlay_interfaces[interface], &addr));
OUT();
}

View File

@ -82,7 +82,10 @@ static struct sched_ent mdp_sock2 = {
.poll={.fd = -1},
};
static int overlay_saw_mdp_frame(struct overlay_frame *frame, overlay_mdp_frame *mdp);
static int overlay_saw_mdp_frame(
struct internal_mdp_header *header,
struct overlay_buffer *payload);
static int mdp_send2(struct socket_address *client, struct mdp_header *header,
const uint8_t *payload, size_t payload_len);
@ -111,6 +114,26 @@ void overlay_mdp_clean_socket_files()
closedir(dir);
}
void overlay_mdp_fill_legacy(
const struct internal_mdp_header *header,
struct overlay_buffer *payload,
overlay_mdp_frame *mdp)
{
mdp->out.src.sid = header->source->sid;
mdp->out.src.port = header->source_port;
mdp->out.dst.sid = header->destination?header->destination->sid:SID_BROADCAST;
mdp->out.dst.port = header->destination_port;
mdp->out.payload_length = ob_remaining(payload);
ob_get_bytes(payload, mdp->out.payload, mdp->out.payload_length);
mdp->out.ttl = header->ttl;
mdp->out.queue = header->qos;
mdp->packetTypeAndFlags=MDP_TX;
if (!(header->modifiers&OF_CRYPTO_CIPHERED))
mdp->packetTypeAndFlags |= MDP_NOCRYPT;
if (!(header->modifiers&OF_CRYPTO_SIGNED))
mdp->packetTypeAndFlags |= MDP_NOSIGN;
}
static int mdp_bind_socket(const char *name)
{
struct socket_address addr;
@ -158,7 +181,7 @@ struct mdp_binding{
struct subscriber *subscriber;
mdp_port_t port;
int version;
int (*internal)(const struct mdp_header *header, const uint8_t *payload, size_t len);
int (*internal)(struct internal_mdp_header *header, struct overlay_buffer *payload);
struct socket_address client;
time_ms_t binding_time;
};
@ -237,6 +260,7 @@ static int overlay_mdp_process_bind_request(struct subscriber *subscriber, mdp_p
if (port == 0){
return WHYF("Port %d cannot be bound", port);
}
if (!mdp_bindings_initialised) {
/* Mark all slots as unused */
int i;
@ -296,23 +320,31 @@ static int overlay_mdp_process_bind_request(struct subscriber *subscriber, mdp_p
}
int mdp_bind_internal(struct subscriber *subscriber, mdp_port_t port,
int (*internal)(const struct mdp_header *header, const uint8_t *payload, size_t len))
int (*internal)(struct internal_mdp_header *header, struct overlay_buffer *payload))
{
int i;
struct mdp_binding *free_slot=NULL;
if (!mdp_bindings_initialised) {
/* Mark all slots as unused */
int i;
for(i=0;i<MDP_MAX_BINDINGS;i++)
mdp_bindings[i].port=0;
mdp_bindings_initialised=1;
}
for(i=0;i<MDP_MAX_BINDINGS;i++) {
if ((!free_slot) && mdp_bindings[i].port==0)
free_slot=&mdp_bindings[i];
if (mdp_bindings[i].port == port
&& mdp_bindings[i].subscriber == subscriber)
return -1;
return WHYF("Internal binding for port %d failed, port already in use", port);
}
if (!free_slot)
return -1;
return WHYF("Internal binding for port %d failed, no free slots", port);
free_slot->subscriber=subscriber;
free_slot->port=port;
@ -323,7 +355,7 @@ int mdp_bind_internal(struct subscriber *subscriber, mdp_port_t port,
}
int mdp_unbind_internal(struct subscriber *subscriber, mdp_port_t port,
int (*internal)(const struct mdp_header *header, const uint8_t *payload, size_t len))
int (*internal)(struct internal_mdp_header *header, struct overlay_buffer *payload))
{
int i;
for(i=0;i<MDP_MAX_BINDINGS;i++) {
@ -338,104 +370,105 @@ int mdp_unbind_internal(struct subscriber *subscriber, mdp_port_t port,
return 0;
}
static int overlay_mdp_decode_header(struct overlay_buffer *buff, overlay_mdp_frame *mdp)
static void overlay_mdp_decode_header(struct internal_mdp_header *header, struct overlay_buffer *buff)
{
/* extract MDP port numbers */
mdp_port_t port = ob_get_packed_ui32(buff);
int same = port&1;
port >>=1;
mdp->in.dst.port = port;
header->destination_port = port;
if (!same)
port = ob_get_packed_ui32(buff);
mdp->in.src.port = port;
int len = ob_remaining(buff);
if (len<0)
return WHY("MDP payload is too short");
mdp->in.payload_length=len;
return ob_get_bytes(buff, &mdp->in.payload[0], len);
header->source_port = port;
}
int overlay_mdp_decrypt(struct overlay_frame *f, overlay_mdp_frame *mdp)
static struct overlay_buffer *overlay_mdp_decrypt(struct internal_mdp_header *header, struct overlay_buffer *payload)
{
IN();
/* Indicate MDP message type */
mdp->packetTypeAndFlags=MDP_TX;
switch(f->modifiers&(OF_CRYPTO_CIPHERED|OF_CRYPTO_SIGNED)) {
struct overlay_buffer *ret=NULL;
switch(header->modifiers&(OF_CRYPTO_CIPHERED|OF_CRYPTO_SIGNED)) {
case 0:
/* nothing to do, b already points to the plain text */
mdp->packetTypeAndFlags|=MDP_NOCRYPT|MDP_NOSIGN;
RETURN(overlay_mdp_decode_header(f->payload, mdp));
overlay_mdp_decode_header(header, payload);
ret = ob_slice(payload, ob_position(payload), ob_remaining(payload));
ob_limitsize(ret, ob_remaining(payload));
break;
default:
case OF_CRYPTO_CIPHERED:
RETURN(WHY("decryption not implemented"));
WHY("decryption not implemented");
break;
case OF_CRYPTO_SIGNED:
{
int len = ob_remaining(f->payload);
if (crypto_verify_message(f->source, ob_ptr(f->payload), &len))
RETURN(-1);
int len = ob_remaining(payload);
if (crypto_verify_message(header->source, ob_current_ptr(payload), &len))
break;
mdp->packetTypeAndFlags|=MDP_NOCRYPT;
ob_limitsize(f->payload, len + ob_position(f->payload));
RETURN(overlay_mdp_decode_header(f->payload, mdp));
ret = ob_slice(payload, ob_position(payload), len);
ob_limitsize(ret, len);
overlay_mdp_decode_header(header, ret);
break;
}
case OF_CRYPTO_CIPHERED|OF_CRYPTO_SIGNED:
{
if (0) DEBUGF("crypted MDP frame for %s", alloca_tohex_sid_t(f->destination->sid));
int nm=crypto_box_curve25519xsalsa20poly1305_BEFORENMBYTES;
//int nm=crypto_box_curve25519xsalsa20poly1305_BEFORENMBYTES;
int nb=crypto_box_curve25519xsalsa20poly1305_NONCEBYTES;
int zb=crypto_box_curve25519xsalsa20poly1305_ZEROBYTES;
int cz=crypto_box_curve25519xsalsa20poly1305_BOXZEROBYTES;
unsigned char *k=keyring_get_nm_bytes(&f->destination->sid, &f->source->sid);
if (!k)
RETURN(WHY("I don't have the private key required to decrypt that"));
if (0)
dump("frame",&f->payload->bytes[f->payload->position], ob_remaining(f->payload));
unsigned char *nonce=ob_get_bytes_ptr(f->payload, nb);
if (!nonce)
RETURN(WHYF("Expected %d bytes of nonce", nb));
int cipher_len=ob_remaining(f->payload);
unsigned char *cipher_text=ob_get_bytes_ptr(f->payload, cipher_len);
if (!cipher_text)
RETURN(WHYF("Expected %d bytes of cipher text", cipher_len));
unsigned char plain_block[cipher_len+cz];
bzero(&plain_block[0],cz);
bcopy(cipher_text,&plain_block[cz],cipher_len);
if (0) {
dump("nm bytes",k,nm);
dump("nonce",nonce,nb);
dump("cipher block",plain_block,sizeof(plain_block));
unsigned char *k=keyring_get_nm_bytes(&header->destination->sid, &header->source->sid);
if (!k){
WHY("I don't have the private key required to decrypt that");
break;
}
unsigned char *nonce=ob_get_bytes_ptr(payload, nb);
if (!nonce){
WHYF("Expected %d bytes of nonce", nb);
break;
}
int cipher_len=ob_remaining(payload);
unsigned char *cipher_text=ob_get_bytes_ptr(payload, cipher_len);
if (!cipher_text){
WHYF("Expected %d bytes of cipher text", cipher_len);
break;
}
struct overlay_buffer *plaintext = ob_new();
if (!ob_makespace(plaintext, cipher_len+cz)){
ob_free(plaintext);
break;
}
ob_limitsize(plaintext, cipher_len+cz);
unsigned char *plain_block = ob_ptr(plaintext);
bzero(plain_block, cz);
bcopy(cipher_text, &plain_block[cz], cipher_len);
cipher_len+=cz;
if (crypto_box_curve25519xsalsa20poly1305_open_afternm
(plain_block,plain_block,cipher_len,nonce,k)) {
RETURN(WHYF("crypto_box_open_afternm() failed (from %s, to %s, len %d)",
alloca_tohex_sid_t(f->source->sid), alloca_tohex_sid_t(f->destination->sid), cipher_len));
ob_free(plaintext);
WHYF("crypto_box_open_afternm() failed (from %s, to %s, len %d)",
alloca_tohex_sid_t(header->source->sid), alloca_tohex_sid_t(header->destination->sid), cipher_len);
break;
}
if (0) dump("plain block",plain_block,sizeof(plain_block));
cipher_len -= zb;
struct overlay_buffer *plaintext = ob_static(&plain_block[zb], cipher_len);
ob_limitsize(plaintext,cipher_len);
int ret=overlay_mdp_decode_header(plaintext, mdp);
ob_free(plaintext);
RETURN(ret);
}
// consume leading zero bytes
ob_get_bytes_ptr(plaintext, zb);
overlay_mdp_decode_header(header, plaintext);
ret=plaintext;
break;
}
}
RETURN(WHY("Failed to decode mdp payload"));
RETURN(ret);
OUT();
}
@ -449,20 +482,29 @@ int overlay_saw_mdp_containing_frame(struct overlay_frame *f)
*/
overlay_mdp_frame mdp;
bzero(&mdp, sizeof(overlay_mdp_frame));
struct internal_mdp_header header;
bzero(&header, sizeof header);
mdp.in.queue = f->queue;
mdp.in.ttl = f->ttl;
header.qos = mdp.in.queue = f->queue;
header.ttl = mdp.in.ttl = f->ttl;
header.source = f->source;
header.destination = f->destination;
header.modifiers = f->modifiers;
header.receive_interface = f->interface;
/* Get source and destination addresses */
mdp.in.dst.sid = (f->destination) ? f->destination->sid : SID_BROADCAST;
mdp.in.src.sid = f->source->sid;
/* copy crypto flags from frame so that we know if we need to decrypt or verify it */
if (overlay_mdp_decrypt(f,&mdp))
struct overlay_buffer *mdp_payload = overlay_mdp_decrypt(&header, f->payload);
if (mdp_payload==NULL)
RETURN(-1);
/* and do something with it! */
RETURN(overlay_saw_mdp_frame(f, &mdp));
int ret=overlay_saw_mdp_frame(&header, mdp_payload);
ob_free(mdp_payload);
RETURN(ret);
OUT();
}
@ -475,7 +517,9 @@ int overlay_mdp_swap_src_dst(overlay_mdp_frame *mdp)
return 0;
}
static int overlay_saw_mdp_frame(struct overlay_frame *frame, overlay_mdp_frame *mdp)
static int overlay_saw_mdp_frame(
struct internal_mdp_header *header,
struct overlay_buffer *payload)
{
IN();
int i;
@ -489,23 +533,17 @@ static int overlay_saw_mdp_frame(struct overlay_frame *frame, overlay_mdp_frame
if (config.debug.mdprequests)
DEBUGF("Received packet (MDP ports: src=%s*:%"PRImdp_port_t", dst=%"PRImdp_port_t")",
alloca_tohex_sid_t_trunc(mdp->out.src.sid, 14),
mdp->out.src.port, mdp->out.dst.port);
alloca_tohex_sid_t_trunc(header->source->sid, 14),
header->source_port, header->destination_port);
// TODO pass in dest subscriber as an argument, we should know it by now
struct subscriber *destination = NULL;
if (frame)
destination = frame->destination;
else if (!is_sid_t_broadcast(mdp->out.dst.sid)){
destination = find_subscriber(mdp->out.dst.sid.binary, SID_SIZE, 1);
}
// TODO filter by sid src (& dst?) port
for(i=0;i<MDP_MAX_BINDINGS;i++)
{
if (mdp_bindings[i].port!=mdp->out.dst.port)
if (mdp_bindings[i].port!=header->destination_port)
continue;
if ((!destination) || mdp_bindings[i].subscriber == destination){
if ((!header->destination) || mdp_bindings[i].subscriber == header->destination){
/* exact match, so stop searching */
match=i;
break;
@ -519,13 +557,18 @@ static int overlay_saw_mdp_frame(struct overlay_frame *frame, overlay_mdp_frame
switch(mdp_bindings[match].version){
case 0:
{
ssize_t len = overlay_mdp_relevant_bytes(mdp);
overlay_mdp_frame mdp;
ob_checkpoint(payload);
overlay_mdp_fill_legacy(header, payload, &mdp);
ob_rewind(payload);
ssize_t len = overlay_mdp_relevant_bytes(&mdp);
if (len < 0)
RETURN(WHY("unsupported MDP packet type"));
struct socket_address *client = &mdp_bindings[match].client;
if (config.debug.mdprequests)
DEBUGF("Forwarding packet to client %s", alloca_socket_address(client));
ssize_t r = sendto(mdp_sock.poll.fd,mdp,len,0, &client->addr, client->addrlen);
ssize_t r = sendto(mdp_sock.poll.fd, &mdp, len, 0, &client->addr, client->addrlen);
if (r == -1){
WHYF_perror("sendto(fd=%d,len=%zu,addr=%s)", mdp_sock.poll.fd, (size_t)len, alloca_socket_address(client));
if (errno == ENOENT){
@ -541,31 +584,36 @@ static int overlay_saw_mdp_frame(struct overlay_frame *frame, overlay_mdp_frame
}
case 1:
{
struct socket_address *client = &mdp_bindings[match].client;
struct mdp_header header;
header.local.sid=mdp->out.dst.sid;
header.local.port=mdp->out.dst.port;
header.remote.sid=mdp->out.src.sid;
header.remote.port=mdp->out.src.port;
header.qos=mdp->out.queue;
header.ttl=mdp->out.ttl;
header.flags=0;
if (mdp->packetTypeAndFlags & MDP_NOCRYPT)
header.flags|=MDP_FLAG_NO_CRYPT;
if (mdp->packetTypeAndFlags & MDP_NOSIGN)
header.flags|=MDP_FLAG_NO_SIGN;
if (mdp_bindings[match].internal)
RETURN(mdp_bindings[match].internal(&header, mdp->out.payload, mdp->out.payload_length));
RETURN(mdp_bindings[match].internal(header, payload));
struct socket_address *client = &mdp_bindings[match].client;
struct mdp_header client_header;
client_header.local.sid=header->destination?header->destination->sid:SID_BROADCAST;
client_header.local.port=header->destination_port;
client_header.remote.sid=header->source->sid;
client_header.remote.port=header->source_port;
client_header.qos=header->qos;
client_header.ttl=header->ttl;
client_header.flags=0;
if (!(header->modifiers&OF_CRYPTO_CIPHERED))
client_header.flags|=MDP_FLAG_NO_CRYPT;
if (!(header->modifiers&OF_CRYPTO_SIGNED))
client_header.flags|=MDP_FLAG_NO_SIGN;
if (config.debug.mdprequests)
DEBUGF("Forwarding packet to client v2 %s", alloca_socket_address(client));
RETURN(mdp_send2(client, &header, mdp->out.payload, mdp->out.payload_length));
size_t len = ob_remaining(payload);
const uint8_t *ptr = ob_get_bytes_ptr(payload, len);
RETURN(mdp_send2(client, &client_header, ptr, len));
}
}
} else {
/* No socket is bound, ignore the packet ... except for magic sockets */
RETURN(overlay_mdp_try_internal_services(frame, mdp));
RETURN(overlay_mdp_try_internal_services(header, payload));
}
RETURN(0);
@ -718,42 +766,39 @@ static struct overlay_buffer * encrypt_payload(
}
// encrypt or sign the plaintext, then queue the frame for transmission.
static int overlay_send_frame(
struct subscriber *source, mdp_port_t src_port,
struct subscriber *destination, mdp_port_t dst_port,
const uint8_t *payload, size_t payload_len,
uint8_t ttl, uint8_t qos, uint8_t modifiers)
static int overlay_send_frame(struct internal_mdp_header *header,
struct overlay_buffer *payload)
{
if (destination && destination->reachable == REACHABLE_SELF)
if (header->destination && header->destination->reachable == REACHABLE_SELF)
return 0;
if (ttl == 0)
ttl = PAYLOAD_TTL_DEFAULT;
else if (ttl > PAYLOAD_TTL_MAX)
if (header->ttl == 0)
header->ttl = PAYLOAD_TTL_DEFAULT;
else if (header->ttl > PAYLOAD_TTL_MAX)
return WHYF("Invalid TTL");
if (qos == 0)
qos = OQ_ORDINARY;
if (header->qos == 0)
header->qos = OQ_ORDINARY;
if (!source)
if (!header->source)
return WHYF("No source specified");
if (config.debug.mdprequests)
DEBUGF("Attempting to queue mdp packet from %s:%d to %s:%d",
alloca_tohex_sid_t(source->sid), src_port,
destination?alloca_tohex_sid_t(destination->sid):"broadcast", dst_port);
alloca_tohex_sid_t(header->source->sid), header->source_port,
header->destination?alloca_tohex_sid_t(header->destination->sid):"broadcast", header->destination_port);
/* Prepare the overlay frame for dispatch */
struct overlay_frame *frame = emalloc_zero(sizeof(struct overlay_frame));
if (!frame)
return -1;
frame->source = source;
frame->destination = destination;
frame->ttl = ttl;
frame->queue = qos;
frame->source = header->source;
frame->destination = header->destination;
frame->ttl = header->ttl;
frame->queue = header->qos;
frame->type = OF_TYPE_DATA;
frame->modifiers = modifiers;
frame->modifiers = header->modifiers;
// copy the plain text message into a new buffer, with the wire encoded port numbers
struct overlay_buffer *plaintext=ob_new();
@ -762,9 +807,9 @@ static int overlay_send_frame(
return -1;
}
overlay_mdp_encode_ports(plaintext, dst_port, src_port);
if (payload && payload_len){
ob_append_bytes(plaintext, payload, payload_len);
overlay_mdp_encode_ports(plaintext, header->destination_port, header->source_port);
if (payload && ob_remaining(payload)){
ob_append_bytes(plaintext, ob_current_ptr(payload), ob_remaining(payload));
}
if (ob_overrun(plaintext)) {
@ -785,7 +830,7 @@ static int overlay_send_frame(
about the crypto matters, and not compression that may be applied
before encryption (since applying it after is useless as ciphered
text should have maximum entropy). */
switch(modifiers) {
switch(header->modifiers) {
case OF_CRYPTO_SIGNED|OF_CRYPTO_CIPHERED:
if (!frame->destination){
ob_free(plaintext);
@ -866,29 +911,34 @@ int overlay_mdp_dispatch(overlay_mdp_frame *mdp, struct socket_address *client)
if (mdp->out.payload_length > sizeof(mdp->out.payload))
FATAL("Payload length is past the end of the buffer");
struct subscriber *source=NULL;
struct subscriber *destination=NULL;
struct internal_mdp_header header;
bzero(&header, sizeof(header));
header.source_port = mdp->out.src.port;
header.destination_port = mdp->out.dst.port;
header.ttl = mdp->out.ttl;
header.qos = mdp->out.queue;
if (is_sid_t_any(mdp->out.src.sid)){
/* set source to ourselves */
source = my_subscriber;
mdp->out.src.sid = source->sid;
header.source = my_subscriber;
mdp->out.src.sid = header.source->sid;
}else if (is_sid_t_broadcast(mdp->out.src.sid)){
/* Nope, I'm sorry but we simply can't send packets from
* broadcast addresses. */
RETURN(WHY("Packet had broadcast address as source address"));
}else{
// assume all local identities have already been unlocked and marked as SELF.
source = find_subscriber(mdp->out.src.sid.binary, SID_SIZE, 0);
if (!source){
header.source = find_subscriber(mdp->out.src.sid.binary, SID_SIZE, 0);
if (!header.source){
RETURN(WHYF("Possible spoofing attempt, tried to send a packet from %s, which is an unknown SID", alloca_tohex_sid_t(mdp->out.src.sid)));
}
if (source->reachable!=REACHABLE_SELF){
if (header.source->reachable!=REACHABLE_SELF){
RETURN(WHYF("Possible spoofing attempt, tried to send a packet from %s", alloca_tohex_sid_t(mdp->out.src.sid)));
}
}
if (overlay_mdp_check_binding(source, mdp->out.src.port, client)){
if (overlay_mdp_check_binding(header.source, header.source_port, client)){
RETURN(overlay_mdp_reply_error
(mdp_sock.poll.fd,
client,8,
@ -908,41 +958,29 @@ int overlay_mdp_dispatch(overlay_mdp_frame *mdp, struct socket_address *client)
client,5,
"Broadcast packets cannot be encrypted "));
}else{
destination = find_subscriber(mdp->out.dst.sid.binary, SID_SIZE, 1);
header.destination = find_subscriber(mdp->out.dst.sid.binary, SID_SIZE, 1);
// should we reply with an error if the destination is not currently routable?
}
if (mdp->out.ttl > PAYLOAD_TTL_MAX) {
if (header.ttl > PAYLOAD_TTL_MAX) {
RETURN(overlay_mdp_reply_error(mdp_sock.poll.fd, client, 9, "TTL out of range"));
}
if (config.debug.mdprequests)
DEBUGF("[%u] destination->sid=%s", __d, destination ? alloca_tohex_sid_t(destination->sid) : "NULL");
if (!destination || destination->reachable == REACHABLE_SELF){
/* Packet is addressed to us / broadcast, we should process it first. */
overlay_saw_mdp_frame(NULL,mdp);
if (destination) {
/* Is local, and is not broadcast, so shouldn't get sent out on the wire. */
if (config.debug.mdprequests)
DEBUGF("[%u] Local packet, not transmitting", __d);
RETURN(0);
}
}
int modifiers=0;
DEBUGF("[%u] destination->sid=%s", __d, header.destination ? alloca_tohex_sid_t(header.destination->sid) : "NULL");
switch(mdp->packetTypeAndFlags&(MDP_NOCRYPT|MDP_NOSIGN)) {
case 0:
// default to encrypted and authenticated
modifiers=OF_CRYPTO_SIGNED|OF_CRYPTO_CIPHERED;
header.modifiers = OF_CRYPTO_SIGNED|OF_CRYPTO_CIPHERED;
break;
case MDP_NOCRYPT:
// sign it, but don't encrypt it.
modifiers=OF_CRYPTO_SIGNED;
header.modifiers = OF_CRYPTO_SIGNED;
break;
case MDP_NOSIGN|MDP_NOCRYPT:
// just send the payload unmodified
modifiers=0;
header.modifiers = 0;
break;
case MDP_NOSIGN:
/* ciphered, but not signed.
@ -954,12 +992,27 @@ int overlay_mdp_dispatch(overlay_mdp_frame *mdp, struct socket_address *client)
RETURN(WHY("Not implemented"));
};
RETURN(overlay_send_frame(
source, mdp->out.src.port,
destination, mdp->out.dst.port,
mdp->out.payload, mdp->out.payload_length,
mdp->out.ttl, mdp->out.queue, modifiers
));
if (!header.destination || header.destination->reachable == REACHABLE_SELF){
/* Packet is addressed to us / broadcast, we should process it first. */
struct overlay_buffer *mdp_payload = ob_static(mdp->out.payload, mdp->out.payload_length);
ob_limitsize(mdp_payload, mdp->out.payload_length);
overlay_saw_mdp_frame(&header, mdp_payload);
ob_free(mdp_payload);
if (header.destination) {
/* Is local, and is not broadcast, so shouldn't get sent out on the wire. */
if (config.debug.mdprequests)
DEBUGF("[%u] Local packet, not transmitting", __d);
RETURN(0);
}
}
struct overlay_buffer *buff = ob_static(mdp->out.payload, mdp->out.payload_length);
ob_limitsize(buff, mdp->out.payload_length);
int ret=overlay_send_frame(&header, buff);
ob_free(buff);
RETURN(ret);
OUT();
}
@ -1121,45 +1174,43 @@ static int mdp_reply2(const struct socket_address *client, const struct mdp_head
#define mdp_reply_ok(A,B) mdp_reply2(A,B,MDP_FLAG_CLOSE,NULL,0)
static int mdp_process_identity_request(struct socket_address *client, struct mdp_header *header,
const uint8_t *payload, size_t payload_len)
struct overlay_buffer *payload)
{
if (payload_len<sizeof(struct mdp_identity_request)){
if (ob_remaining(payload)<sizeof(struct mdp_identity_request)){
mdp_reply_error(client, header);
return WHY("Request too small");
}
struct mdp_identity_request *request = (struct mdp_identity_request *)payload;
payload += sizeof(struct mdp_identity_request);
payload_len -= sizeof(struct mdp_identity_request);
struct mdp_identity_request request;
ob_get_bytes(payload, (uint8_t *)&request, sizeof(request));
switch(request->action){
switch(request.action){
case ACTION_LOCK:
switch (request->type){
switch (request.type){
case TYPE_PIN:
{
const char *pin = (char *)payload;
size_t ofs=0;
while(ofs < payload_len){
if (!payload[ofs++]){
unsigned cn;
for (cn = keyring->context_count; cn > 0;) {
keyring_context *cx = keyring->contexts[--cn];
unsigned in;
for (in = cx->identity_count; in > 0;) {
keyring_identity *id = cx->identities[--in];
if (id->subscriber != my_subscriber && strcmp(id->PKRPin, pin) == 0)
keyring_release_identity(keyring, cn, in);
}
while(1){
const char *pin = ob_get_str_ptr(payload);
if (!pin)
break;
unsigned cn;
for (cn = keyring->context_count; cn > 0;) {
keyring_context *cx = keyring->contexts[--cn];
unsigned in;
for (in = cx->identity_count; in > 0;) {
keyring_identity *id = cx->identities[--in];
if (id->subscriber != my_subscriber && strcmp(id->PKRPin, pin) == 0)
keyring_release_identity(keyring, cn, in);
}
pin=(char *)&payload[ofs++];
}
}
}
break;
case TYPE_SID:
while(payload_len>=SID_SIZE){
keyring_release_subscriber(keyring, (const sid_t*)payload);
payload+=SID_SIZE;
payload_len-=SID_SIZE;
while(1){
const sid_t *sid=(const sid_t*)ob_get_bytes_ptr(payload,SID_SIZE);
if (sid==NULL)
break;
keyring_release_subscriber(keyring, sid);
}
break;
default:
@ -1169,18 +1220,16 @@ static int mdp_process_identity_request(struct socket_address *client, struct md
break;
case ACTION_UNLOCK:
{
if (request->type!=TYPE_PIN){
if (request.type!=TYPE_PIN){
mdp_reply_error(client, header);
return WHY("Unknown request type");
}
int unlock_count=0;
const char *pin = (char *)payload;
size_t ofs=0;
while(ofs < payload_len){
if (!payload[ofs++]){
unlock_count += keyring_enter_pin(keyring, pin);
pin=(char *)&payload[ofs++];
}
while(1){
const char *pin = ob_get_str_ptr(payload);
if (!pin)
break;
unlock_count += keyring_enter_pin(keyring, pin);
}
}
break;
@ -1194,15 +1243,16 @@ static int mdp_process_identity_request(struct socket_address *client, struct md
// return one response per matching identity
static int mdp_search_identities(struct socket_address *client, struct mdp_header *header,
const uint8_t *payload, size_t payload_len)
struct overlay_buffer *payload)
{
unsigned cn=0, in=0, kp=0;
const char *tag=NULL;
const unsigned char *value=NULL;
size_t value_len=0;
size_t payload_len = ob_remaining(payload);
if (payload_len){
if (keyring_unpack_tag(payload, payload_len, &tag, &value, &value_len)){
if (keyring_unpack_tag(ob_ptr(payload), payload_len, &tag, &value, &value_len)){
mdp_reply_error(client, header);
return -1;
}
@ -1242,8 +1292,11 @@ static int mdp_search_identities(struct socket_address *client, struct mdp_heade
}
static void mdp_process_packet(struct socket_address *client, struct mdp_header *header,
const uint8_t *payload, size_t payload_len)
struct overlay_buffer *payload)
{
struct internal_mdp_header internal_header;
bzero(&internal_header, sizeof(internal_header));
if ((header->flags & MDP_FLAG_CLOSE) && header->local.port==0){
int i;
for(i=0;i<MDP_MAX_BINDINGS;i++) {
@ -1262,20 +1315,18 @@ static void mdp_process_packet(struct socket_address *client, struct mdp_header
return;
}
// find the source subscriber
struct subscriber *source=NULL;
// find local sid
if (is_sid_t_broadcast(header->local.sid)){
// leave source NULL to indicate listening on all local SID's
// note that attempting anything else will fail
}else if (is_sid_t_any(header->local.sid)){
// leaving the sid blank indicates that we should use our main identity
source = my_subscriber;
header->local.sid = source->sid;
internal_header.source = my_subscriber;
header->local.sid = my_subscriber->sid;
}else{
// find the matching sid from our keyring
source = find_subscriber(header->local.sid.binary, sizeof(header->local.sid), 0);
if (!source || source->reachable != REACHABLE_SELF){
internal_header.source = find_subscriber(header->local.sid.binary, sizeof(header->local.sid), 0);
if (!internal_header.source || internal_header.source->reachable != REACHABLE_SELF){
mdp_reply_error(client, header);
WHY("Subscriber is not local");
}
@ -1292,6 +1343,11 @@ static void mdp_process_packet(struct socket_address *client, struct mdp_header
header->local.port=next_port_binding;
}
internal_header.source_port = header->local.port;
internal_header.destination_port = header->remote.port;
internal_header.ttl = header->ttl;
internal_header.qos = header->qos;
// find matching binding
{
int i;
@ -1300,7 +1356,7 @@ static void mdp_process_packet(struct socket_address *client, struct mdp_header
free_slot=&mdp_bindings[i];
if (mdp_bindings[i].port == header->local.port
&& mdp_bindings[i].subscriber == source){
&& mdp_bindings[i].subscriber == internal_header.source){
binding = &mdp_bindings[i];
break;
@ -1330,7 +1386,7 @@ static void mdp_process_packet(struct socket_address *client, struct mdp_header
// claim binding
binding = free_slot;
binding->port = header->local.port;
binding->subscriber = source;
binding->subscriber = internal_header.source;
bcopy(&client->addr, &binding->client.addr, client->addrlen);
binding->client.addrlen = client->addrlen;
binding->binding_time=gettime_ms();
@ -1358,13 +1414,13 @@ static void mdp_process_packet(struct socket_address *client, struct mdp_header
case MDP_IDENTITY:
if (config.debug.mdprequests)
DEBUGF("Processing MDP_IDENTITY from %s", alloca_socket_address(client));
mdp_process_identity_request(client, header, payload, payload_len);
mdp_process_identity_request(client, header, payload);
break;
// seach unlocked identities
case MDP_SEARCH_IDS:
if (config.debug.mdprequests)
DEBUGF("Processing MDP_SEARCH_IDS from %s", alloca_socket_address(client));
mdp_search_identities(client, header, payload, payload_len);
mdp_search_identities(client, header, payload);
break;
default:
mdp_reply_error(client, header);
@ -1376,7 +1432,7 @@ static void mdp_process_packet(struct socket_address *client, struct mdp_header
// double check that this binding belongs to this connection
if (!binding
|| binding->internal
|| !source
|| !internal_header.source
|| header->local.port == 0
|| cmp_sockaddr(&binding->client, client)!=0){
mdp_reply_error(client, header);
@ -1384,45 +1440,24 @@ static void mdp_process_packet(struct socket_address *client, struct mdp_header
return;
}
struct subscriber *destination=NULL;
if (!is_sid_t_broadcast(header->remote.sid))
destination = find_subscriber(header->remote.sid.binary, SID_SIZE, 1);
internal_header.destination = find_subscriber(header->remote.sid.binary, SID_SIZE, 1);
int modifiers=0;
if ((header->flags & MDP_FLAG_NO_CRYPT) == 0)
modifiers|=OF_CRYPTO_CIPHERED;
internal_header.modifiers|=OF_CRYPTO_CIPHERED;
if ((header->flags & MDP_FLAG_NO_SIGN) == 0)
modifiers|=OF_CRYPTO_SIGNED;
internal_header.modifiers|=OF_CRYPTO_SIGNED;
if (!destination || destination->reachable==REACHABLE_SELF){
// TODO deprecate this mdp struct, deal with local delivery in send_frame
overlay_mdp_frame mdp;
mdp.out.src.sid = header->local.sid;
mdp.out.src.port = header->local.port;
mdp.out.dst.sid = header->remote.sid;
mdp.out.dst.port = header->remote.port;
bcopy(payload, mdp.out.payload, payload_len);
mdp.out.payload_length = payload_len;
mdp.out.ttl = header->ttl;
mdp.out.queue = header->qos;
mdp.packetTypeAndFlags=MDP_TX;
if (header->flags&MDP_FLAG_NO_CRYPT)
mdp.packetTypeAndFlags |= MDP_NOCRYPT;
if (header->flags&MDP_FLAG_NO_SIGN)
mdp.packetTypeAndFlags |= MDP_NOSIGN;
if (!internal_header.destination || internal_header.destination->reachable==REACHABLE_SELF){
if (config.debug.mdprequests)
DEBUGF("Attempting to process mdp packet locally");
overlay_saw_mdp_frame(NULL, &mdp);
overlay_saw_mdp_frame(&internal_header, payload);
}
// construct, encrypt, sign and queue the packet
if (overlay_send_frame(
source, header->local.port,
destination, header->remote.port,
payload, payload_len,
header->ttl, header->qos, modifiers
)){
&internal_header,
payload)){
mdp_reply_error(client, header);
return;
}
@ -1507,7 +1542,13 @@ static void mdp_poll2(struct sched_ent *alarm)
client.addrlen = hdr.msg_namelen;
size_t payload_len = (size_t)(len - sizeof header);
mdp_process_packet(&client, &header, payload, payload_len);
struct overlay_buffer *buff = ob_static(payload, payload_len);
ob_limitsize(buff, payload_len);
mdp_process_packet(&client, &header, buff);
ob_free(buff);
}
}

View File

@ -41,7 +41,7 @@ int rhizome_mdp_send_block(struct subscriber *dest, const rhizome_bid_t *bid, ui
RETURN(WHYF("Invalid block length %d", blockLength));
if (config.debug.rhizome_tx)
DEBUGF("Requested blocks for %s @%"PRIx64" bitmap %x", alloca_tohex_rhizome_bid_t(*bid), fileOffset, bitmap);
DEBUGF("Requested blocks for bid=%s, ver=%"PRIu64" @%"PRIx64" bitmap %x", alloca_tohex_rhizome_bid_t(*bid), version, fileOffset, bitmap);
overlay_mdp_frame reply;
bzero(&reply,sizeof(reply));
@ -109,24 +109,24 @@ int rhizome_mdp_send_block(struct subscriber *dest, const rhizome_bid_t *bid, ui
OUT();
}
int overlay_mdp_service_rhizomerequest(struct overlay_frame *frame, overlay_mdp_frame *mdp)
int overlay_mdp_service_rhizomerequest(struct internal_mdp_header *header, struct overlay_buffer *payload)
{
const rhizome_bid_t *bidp = (const rhizome_bid_t *) &mdp->out.payload[0];
uint64_t version = read_uint64(&mdp->out.payload[sizeof bidp->binary]);
uint64_t fileOffset = read_uint64(&mdp->out.payload[sizeof bidp->binary + 8]);
uint32_t bitmap = read_uint32(&mdp->out.payload[sizeof bidp->binary + 8 + 8]);
uint16_t blockLength = read_uint16(&mdp->out.payload[sizeof bidp->binary + 8 + 8 + 4]);
return rhizome_mdp_send_block(frame->source, bidp, version, fileOffset, bitmap, blockLength);
const rhizome_bid_t *bidp = (const rhizome_bid_t *) ob_get_bytes_ptr(payload, sizeof bidp->binary);
// Note, was originally built using read_uint64 which has reverse byte order of ob_get_ui64
uint64_t version = ob_get_ui64_rv(payload);
uint64_t fileOffset = ob_get_ui64_rv(payload);
uint32_t bitmap = ob_get_ui32_rv(payload);
uint16_t blockLength = ob_get_ui16_rv(payload);
if (ob_overrun(payload))
return -1;
return rhizome_mdp_send_block(header->source, bidp, version, fileOffset, bitmap, blockLength);
}
int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp)
int overlay_mdp_service_rhizomeresponse(struct overlay_buffer *payload)
{
IN();
if (!mdp->out.payload_length)
RETURN(WHYF("No payload?"));
int type=mdp->out.payload[0];
int type=ob_get(payload);
if (config.debug.rhizome_mdp_rx)
DEBUGF("Received Rhizome over MDP block, type=%02x",type);
@ -135,14 +135,14 @@ int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp)
case 'B': /* data block */
case 'T': /* terminal data block */
{
if (mdp->out.payload_length<(1+16+8+8+1))
unsigned char *bidprefix=ob_get_bytes_ptr(payload, 16);
uint64_t version=ob_get_ui64_rv(payload);
uint64_t offset=ob_get_ui64_rv(payload);
if (ob_overrun(payload))
RETURN(WHYF("Payload too short"));
unsigned char *bidprefix=&mdp->out.payload[1];
uint64_t version=read_uint64(&mdp->out.payload[1+16]);
uint64_t offset=read_uint64(&mdp->out.payload[1+16+8]);
size_t count = mdp->out.payload_length-(1+16+8+8);
unsigned char *bytes=&mdp->out.payload[1+16+8+8];
size_t count = ob_remaining(payload);
unsigned char *bytes=ob_current_ptr(payload);
if (config.debug.rhizome_mdp_rx)
DEBUGF("bidprefix=%02x%02x%02x%02x*, offset=%"PRId64", count=%zu",
bidprefix[0],bidprefix[1],bidprefix[2],bidprefix[3],offset,count);
@ -349,47 +349,72 @@ end:
RETURN(ret);
}
static int overlay_mdp_service_manifest_requests(struct overlay_frame *frame, const uint8_t *payload, size_t len)
static int overlay_mdp_service_manifest_requests(struct internal_mdp_header *header, struct overlay_buffer *payload)
{
size_t offset = 0;
while (offset < len) {
while (ob_remaining(payload)) {
const unsigned char *bar = ob_get_bytes_ptr(payload, RHIZOME_BAR_BYTES);
if (!bar)
break;
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
return WHY("Unable to allocate manifest");
const unsigned char *bar = &payload[offset];
if (!rhizome_retrieve_manifest_by_prefix(&bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES, m)){
rhizome_advertise_manifest(frame->source, m);
rhizome_advertise_manifest(header->source, m);
// pre-emptively send the payload if it will fit in a single packet
if (m->filesize > 0 && m->filesize <= 1024)
rhizome_mdp_send_block(frame->source, &m->cryptoSignPublic, m->version, 0, 0, m->filesize);
rhizome_mdp_send_block(header->source, &m->cryptoSignPublic, m->version, 0, 0, m->filesize);
}
rhizome_manifest_free(m);
offset+=RHIZOME_BAR_BYTES;
}
return 0;
}
int overlay_mdp_try_internal_services(struct overlay_frame *frame, overlay_mdp_frame *mdp)
void overlay_mdp_bind_internal_services()
{
mdp_bind_internal(NULL, MDP_PORT_LINKSTATE, link_receive);
mdp_bind_internal(NULL, MDP_PORT_RHIZOME_REQUEST, overlay_mdp_service_rhizomerequest);
mdp_bind_internal(NULL, MDP_PORT_RHIZOME_MANIFEST_REQUEST, overlay_mdp_service_manifest_requests);
mdp_bind_internal(NULL, MDP_PORT_RHIZOME_SYNC, overlay_mdp_service_rhizome_sync);
}
int overlay_mdp_try_internal_services(
struct internal_mdp_header *header, struct overlay_buffer *payload)
{
IN();
switch(mdp->out.dst.port) {
case MDP_PORT_LINKSTATE: RETURN(link_receive(frame, mdp));
case MDP_PORT_VOMP: RETURN(vomp_mdp_received(mdp));
case MDP_PORT_KEYMAPREQUEST: RETURN(keyring_mapping_request(keyring, frame, mdp));
case MDP_PORT_DNALOOKUP: RETURN(overlay_mdp_service_dnalookup(mdp));
case MDP_PORT_ECHO: RETURN(overlay_mdp_service_echo(mdp));
case MDP_PORT_TRACE: RETURN(overlay_mdp_service_trace(mdp));
case MDP_PORT_PROBE: RETURN(overlay_mdp_service_probe(frame, mdp));
case MDP_PORT_STUNREQ: RETURN(overlay_mdp_service_stun_req(mdp));
case MDP_PORT_STUN: RETURN(overlay_mdp_service_stun(mdp));
case MDP_PORT_RHIZOME_REQUEST: RETURN(overlay_mdp_service_rhizomerequest(frame, mdp));
case MDP_PORT_RHIZOME_RESPONSE: RETURN(overlay_mdp_service_rhizomeresponse(mdp));
case MDP_PORT_RHIZOME_MANIFEST_REQUEST: RETURN(overlay_mdp_service_manifest_requests(frame, mdp->out.payload, mdp->out.payload_length));
case MDP_PORT_RHIZOME_SYNC: RETURN(overlay_mdp_service_rhizome_sync(frame, mdp));
overlay_mdp_frame mdp;
// TODO convert to internal bindings
switch(header->destination_port) {
case MDP_PORT_VOMP:
overlay_mdp_fill_legacy(header, payload, &mdp);
RETURN(vomp_mdp_received(&mdp));
case MDP_PORT_KEYMAPREQUEST:
overlay_mdp_fill_legacy(header, payload, &mdp);
RETURN(keyring_mapping_request(keyring, header, &mdp));
case MDP_PORT_DNALOOKUP:
overlay_mdp_fill_legacy(header, payload, &mdp);
RETURN(overlay_mdp_service_dnalookup(&mdp));
case MDP_PORT_ECHO:
overlay_mdp_fill_legacy(header, payload, &mdp);
RETURN(overlay_mdp_service_echo(&mdp));
case MDP_PORT_TRACE:
overlay_mdp_fill_legacy(header, payload, &mdp);
RETURN(overlay_mdp_service_trace(&mdp));
case MDP_PORT_PROBE:
overlay_mdp_fill_legacy(header, payload, &mdp);
RETURN(overlay_mdp_service_probe(header, &mdp));
case MDP_PORT_STUNREQ:
overlay_mdp_fill_legacy(header, payload, &mdp);
RETURN(overlay_mdp_service_stun_req(&mdp));
case MDP_PORT_STUN:
overlay_mdp_fill_legacy(header, payload, &mdp);
RETURN(overlay_mdp_service_stun(&mdp));
case MDP_PORT_RHIZOME_RESPONSE:
RETURN(overlay_mdp_service_rhizomeresponse(payload));
}
/* Unbound socket. We won't be sending ICMP style connection refused
messages, partly because they are a waste of bandwidth. */
RETURN(WHYF("Received packet for which no listening process exists (MDP ports: src=%d, dst=%d",
mdp->out.src.port,mdp->out.dst.port));
header->source_port, header->destination_port));
}

View File

@ -98,6 +98,18 @@ struct overlay_frame {
struct overlay_buffer *payload;
};
// simple representation for passing mdp packet header details
struct internal_mdp_header{
struct subscriber *source;
mdp_port_t source_port;
struct subscriber *destination;
mdp_port_t destination_port;
uint8_t ttl;
uint8_t qos;
uint8_t modifiers; // combination of OF_ flags, as per overlay_frame above
struct overlay_interface *receive_interface;
};
int op_free(struct overlay_frame *p);
struct overlay_frame *op_dup(struct overlay_frame *f);

View File

@ -927,7 +927,7 @@ int rhizome_cache_close();
int rhizome_database_filehash_from_id(const rhizome_bid_t *bidp, uint64_t version, rhizome_filehash_t *hashp);
int overlay_mdp_service_rhizome_sync(struct overlay_frame *frame, overlay_mdp_frame *mdp);
int overlay_mdp_service_rhizome_sync(struct internal_mdp_header *header, struct overlay_buffer *payload);
int rhizome_sync_announce();
int rhizome_sync_bundle_inserted(const unsigned char *bar);

View File

@ -455,32 +455,27 @@ int rhizome_sync_announce()
return 0;
}
int overlay_mdp_service_rhizome_sync(struct overlay_frame *frame, overlay_mdp_frame *mdp)
int overlay_mdp_service_rhizome_sync(struct internal_mdp_header *header, struct overlay_buffer *payload)
{
if (!frame)
return 0;
struct rhizome_sync *state = frame->source->sync_state;
struct rhizome_sync *state = header->source->sync_state;
if (!state){
state = frame->source->sync_state = emalloc_zero(sizeof(struct rhizome_sync));
state = header->source->sync_state = emalloc_zero(sizeof(struct rhizome_sync));
state->start_time=gettime_ms();
}
struct overlay_buffer *b = ob_static(mdp->out.payload, sizeof(mdp->out.payload));
ob_limitsize(b, mdp->out.payload_length);
int type = ob_get(b);
int type = ob_get(payload);
switch (type){
case MSG_TYPE_BARS:
sync_process_bar_list(frame->source, state, b);
sync_process_bar_list(header->source, state, payload);
break;
case MSG_TYPE_REQ:
{
int forwards = ob_get(b);
uint64_t token = ob_get_packed_ui64(b);
sync_send_response(frame->source, forwards, token, 0);
int forwards = ob_get(payload);
uint64_t token = ob_get_packed_ui64(payload);
sync_send_response(header->source, forwards, token, 0);
}
break;
}
ob_free(b);
rhizome_sync_send_requests(frame->source, state);
rhizome_sync_send_requests(header->source, state);
return 0;
}

View File

@ -1215,18 +1215,15 @@ int link_received_packet(struct decode_context *context, int sender_seq, char un
}
// parse incoming link details
int link_receive(struct overlay_frame *frame, overlay_mdp_frame *mdp)
int link_receive(struct internal_mdp_header *header, struct overlay_buffer *payload)
{
IN();
struct overlay_buffer *payload = ob_static(mdp->out.payload, mdp->out.payload_length);
ob_limitsize(payload, mdp->out.payload_length);
struct subscriber *sender = find_subscriber(mdp->out.src.sid.binary, SID_SIZE, 0);
struct neighbour *neighbour = get_neighbour(sender, 1);
struct neighbour *neighbour = get_neighbour(header->source, 1);
struct decode_context context;
bzero(&context, sizeof(context));
context.interface = frame->interface;
context.interface = header->receive_interface;
time_ms_t now = gettime_ms();
char changed = 0;
@ -1300,7 +1297,7 @@ int link_receive(struct overlay_frame *frame, overlay_mdp_frame *mdp)
if (receiver == my_subscriber){
// track if our neighbour is using us as an immediate neighbour, if they are we need to ack / nack promptly
neighbour->using_us = (transmitter==sender?1:0);
neighbour->using_us = (transmitter==header->source?1:0);
// for routing, we can completely ignore any links that our neighbour is using to route to us.
// we can always send packets to ourself :)
@ -1309,7 +1306,7 @@ int link_receive(struct overlay_frame *frame, overlay_mdp_frame *mdp)
struct network_destination *destination=NULL;
if (receiver == sender){
if (receiver == header->source){
// ignore other incoming links to our neighbour
if (transmitter!=my_subscriber || interface_id==-1)
continue;
@ -1353,7 +1350,7 @@ int link_receive(struct overlay_frame *frame, overlay_mdp_frame *mdp)
if (!link)
continue;
if (transmitter == my_subscriber && receiver == sender && interface_id != -1 && destination){
if (transmitter == my_subscriber && receiver == header->source && interface_id != -1 && destination){
// they can hear us? we can route through them!
version = link->link_version;
@ -1376,7 +1373,7 @@ int link_receive(struct overlay_frame *frame, overlay_mdp_frame *mdp)
// process acks / nacks
if (ack_seq!=-1){
overlay_queue_ack(sender, destination, ack_mask, ack_seq);
overlay_queue_ack(header->source, destination, ack_mask, ack_seq);
// did they miss our last ack?
if (neighbour->last_update_seq!=-1){
@ -1386,7 +1383,8 @@ int link_receive(struct overlay_frame *frame, overlay_mdp_frame *mdp)
}else if(seq_delta < 128){
// send another ack asap
if (config.debug.ack)
DEBUGF("LINK STATE; neighbour %s missed ack %d, queue another", alloca_tohex_sid_t(sender->sid), neighbour->last_update_seq);
DEBUGF("LINK STATE; neighbour %s missed ack %d, queue another",
alloca_tohex_sid_t(header->source->sid), neighbour->last_update_seq);
neighbour->next_neighbour_update=now+5;
update_alarm(__WHENCE__, neighbour->next_neighbour_update);
}
@ -1407,7 +1405,7 @@ int link_receive(struct overlay_frame *frame, overlay_mdp_frame *mdp)
}
}
send_please_explain(&context, my_subscriber, sender);
send_please_explain(&context, my_subscriber, header->source);
if (changed){
route_version++;

View File

@ -246,6 +246,7 @@ struct decode_context;
struct socket_address;
struct overlay_interface;
struct network_destination;
struct internal_mdp_header;
/* Make sure we have space to put bytes of the packet as we go along */
#define CHECK_PACKET_LEN(B) {if (((*packet_len)+(B))>=packet_maxlen) { return WHY("Packet composition ran out of space."); } }
@ -418,11 +419,15 @@ int overlay_mdp_dispatch(overlay_mdp_frame *mdp, struct socket_address *client);
void overlay_mdp_encode_ports(struct overlay_buffer *plaintext, mdp_port_t dst_port, mdp_port_t src_port);
int overlay_mdp_dnalookup_reply(const sockaddr_mdp *dstaddr, const sid_t *resolved_sidp, const char *uri, const char *did, const char *name);
struct mdp_header;
void overlay_mdp_fill_legacy(
const struct internal_mdp_header *header,
struct overlay_buffer *payload,
overlay_mdp_frame *mdp);
int mdp_bind_internal(struct subscriber *subscriber, mdp_port_t port,
int (*internal)(const struct mdp_header *header, const uint8_t *payload, size_t len));
int (*internal)(struct internal_mdp_header *header, struct overlay_buffer *payload));
int mdp_unbind_internal(struct subscriber *subscriber, mdp_port_t port,
int (*internal)(const struct mdp_header *header, const uint8_t *payload, size_t len));
int (*internal)(struct internal_mdp_header *header, struct overlay_buffer *payload));
struct vomp_call_state;
@ -493,7 +498,8 @@ int overlay_mdp_setup_sockets();
int overlay_packetradio_setup_port(struct overlay_interface *interface);
void server_config_reload(struct sched_ent *alarm);
void server_shutdown_check(struct sched_ent *alarm);
int overlay_mdp_try_internal_services(struct overlay_frame *frame, overlay_mdp_frame *mdp);
void overlay_mdp_bind_internal_services();
int overlay_mdp_try_internal_services(struct internal_mdp_header *header, struct overlay_buffer *payload);
int overlay_send_probe(struct subscriber *peer, struct network_destination *destination, int queue);
int overlay_send_stun_request(struct subscriber *server, struct subscriber *request);
void fd_periodicstats(struct sched_ent *alarm);
@ -508,7 +514,7 @@ void rhizome_server_poll(struct sched_ent *alarm);
int overlay_mdp_service_stun_req(overlay_mdp_frame *mdp);
int overlay_mdp_service_stun(overlay_mdp_frame *mdp);
int overlay_mdp_service_probe(struct overlay_frame *frame, overlay_mdp_frame *mdp);
int overlay_mdp_service_probe(struct internal_mdp_header *header, overlay_mdp_frame *mdp);
time_ms_t limit_next_allowed(struct limit_state *state);
int limit_is_allowed(struct limit_state *state);
@ -533,7 +539,7 @@ extern char crash_handler_clue[1024];
int link_received_duplicate(struct subscriber *subscriber, int previous_seq);
int link_received_packet(struct decode_context *context, int sender_seq, char unicast);
int link_receive(struct overlay_frame *frame, overlay_mdp_frame *mdp);
int link_receive(struct internal_mdp_header *header, struct overlay_buffer *payload);
void link_explained(struct subscriber *subscriber);
void link_interface_down(struct overlay_interface *interface);
int link_state_announce_links();

View File

@ -22,7 +22,6 @@ source "${0%/*}/../testframework.sh"
source "${0%/*}/../testdefs.sh"
configure_servald_server() {
add_servald_interface
executeOk_servald config \
set log.console.show_pid on \
set log.console.show_time on \
@ -63,12 +62,13 @@ setup_publish() {
set dna.helper.executable "$servald_build_root/directory_service" \
set debug.dnahelper on
foreach_instance +B +C +D executeOk_servald config set directory.service $SIDA
foreach_instance +A +B +C +D add_servald_interface
start_servald_instances +A +B +C +D
wait_until grep "DNAHELPER got STARTED ACK" $LOGA
}
doc_publish="Publish and retrieve a directory entry"
test_publish() {
wait_until grep "DNAHELPER got STARTED ACK" $LOGA
foreach_instance +B +C +D
wait_until sent_directory_request
set_instance +A
@ -95,20 +95,20 @@ interface_up() {
}
start_routing_instance() {
executeOk_servald config \
set server.interface_path "$SERVALD_VAR" \
set rhizome.enable No \
set log.console.show_pid on \
set log.console.show_time on \
set debug.mdprequests Yes \
set debug.overlayframes Yes
start_servald_server
wait_until interface_up
}
configure_node() {
executeOk_servald config \
set server.interface_path "$SERVALD_VAR" \
set rhizome.enable No \
set log.console.show_pid on \
set log.console.show_time on \
set debug.mdprequests Yes \
set debug.overlayframes Yes \
set interfaces.0.file dummy1 \
set interfaces.0.socket_type file \
set interfaces.0.send_broadcasts 0 \
set interfaces.0.drop_broadcasts on \
set interfaces.0.default_route 1 \