Allow for payloads with a pre-determined destination

This commit is contained in:
Jeremy Lakeman 2012-12-03 13:44:31 +10:30
parent d5f78bcffe
commit a67e4114a8
7 changed files with 95 additions and 90 deletions

View File

@ -67,7 +67,7 @@ struct subscriber{
// if direct, or unicast, where do we send packets?
struct overlay_interface *interface;
// if reachable==REACHABLE_UNICAST send packets to this address, else use the interface broadcast address
// if reachable&REACHABLE_UNICAST send packets to this address, else use the interface broadcast address
struct sockaddr_in address;
// public signing key details for remote peers

View File

@ -43,11 +43,13 @@ struct overlay_frame {
// null if destination is broadcast
struct subscriber *destination;
struct subscriber *next_hop;
struct subscriber *source;
/* IPv4 node frame was received from (if applicable) */
struct sockaddr *recvaddr;
/* IPv4 address the frame was received from, or should be sent to */
int destination_resolved;
struct sockaddr_in recvaddr;
overlay_interface *interface;
/* Actual payload */

View File

@ -189,19 +189,16 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
f.interface = interface;
if (recvaddr->sa_family==AF_INET){
f.recvaddr=recvaddr;
f.recvaddr=*((struct sockaddr_in *)recvaddr);
if (debug&DEBUG_OVERLAYFRAMES)
DEBUG("Received overlay packet");
} else {
if (interface->fileP) {
/* dummy interface, so tell to use localhost */
loopback.sin_family = AF_INET;
loopback.sin_port = 0;
loopback.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
f.recvaddr=(struct sockaddr *)&loopback;
} else
/* some other sort of interface, so we can't offer any help here */
f.recvaddr=NULL;
f.recvaddr.sin_family = AF_INET;
f.recvaddr.sin_port = 0;
f.recvaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
}
}
overlay_address_parse(&context, b, &context.sender);

View File

@ -74,7 +74,7 @@ int overlay_frame_build_header(struct decode_context *context, struct overlay_bu
}
int overlay_frame_append_payload(struct decode_context *context, overlay_interface *interface,
struct overlay_frame *p, struct subscriber *next_hop, struct overlay_buffer *b)
struct overlay_frame *p, struct overlay_buffer *b)
{
/* Convert a payload (frame) structure into a series of bytes.
Assumes that any encryption etc has already been done.
@ -100,7 +100,7 @@ int overlay_frame_append_payload(struct decode_context *context, overlay_interfa
if (overlay_frame_build_header(context, headers,
p->queue, p->type, p->modifiers, p->ttl,
(p->destination?NULL:&p->broadcast_id), next_hop,
(p->destination?NULL:&p->broadcast_id), p->next_hop,
p->destination, p->source))
goto cleanup;

View File

@ -161,10 +161,12 @@ int overlay_payload_enqueue(struct overlay_frame *p)
if (!p) return WHY("Cannot queue NULL");
if (p->destination){
int r = subscriber_is_reachable(p->destination);
if (!(r&REACHABLE))
return WHYF("Cannot send %x packet, destination %s is %s", p->type, alloca_tohex_sid(p->destination->sid), r==REACHABLE_SELF?"myself":"unreachable");
if (!p->destination_resolved){
if (p->destination){
int r = subscriber_is_reachable(p->destination);
if (!(r&REACHABLE))
return WHYF("Cannot send %x packet, destination %s is %s", p->type, alloca_tohex_sid(p->destination->sid), r==REACHABLE_SELF?"myself":"unreachable");
}
}
if (p->queue>=OQ_MAX)
@ -225,16 +227,14 @@ int overlay_payload_enqueue(struct overlay_frame *p)
overlay_update_queue_schedule(queue, p);
if (0) overlay_queue_dump(queue);
return 0;
}
static void
overlay_init_packet(struct outgoing_packet *packet, overlay_interface *interface, int tick){
overlay_init_packet(struct outgoing_packet *packet, overlay_interface *interface, struct sockaddr_in addr, int tick){
packet->interface = interface;
packet->i = (interface - overlay_interfaces);
packet->dest=interface->broadcast_address;
packet->dest=addr;
packet->buffer=ob_new();
packet->add_advertisements=1;
ob_limitsize(packet->buffer, packet->interface->mtu);
@ -301,86 +301,92 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
even if we hear it from somewhere else in the mean time
*/
struct subscriber *next_hop = frame->destination;
if (next_hop){
// Where do we need to route this payload next?
if (!frame->destination_resolved){
frame->next_hop = frame->destination;
int r = subscriber_is_reachable(next_hop);
// first, should we try to bounce this payload off the directory service?
if (r==REACHABLE_NONE &&
directory_service &&
next_hop!=directory_service){
next_hop=directory_service;
r=subscriber_is_reachable(directory_service);
}
// do we need to route via a neighbour?
if (r&REACHABLE_INDIRECT){
next_hop = next_hop->next_hop;
r = subscriber_is_reachable(next_hop);
}
if (!(r&REACHABLE_DIRECT))
goto skip;
// ignore resend logic for unicast packets, where wifi gives better resilience
if (r&REACHABLE_UNICAST)
frame->send_copies=1;
if (packet->buffer){
// is this packet going our way?
if(packet->interface != next_hop->interface)
if (frame->next_hop){
// Where do we need to route this payload next?
int r = subscriber_is_reachable(frame->next_hop);
// first, should we try to bounce this payload off the directory service?
if (r==REACHABLE_NONE &&
directory_service &&
frame->next_hop!=directory_service){
frame->next_hop=directory_service;
r=subscriber_is_reachable(directory_service);
}
// do we need to route via a neighbour?
if (r&REACHABLE_INDIRECT){
frame->next_hop = frame->next_hop->next_hop;
r = subscriber_is_reachable(frame->next_hop);
}
if (!(r&REACHABLE_DIRECT))
goto skip;
if ((r&REACHABLE_BROADCAST) && packet->unicast)
goto skip;
frame->interface = frame->next_hop->interface;
if ((r&REACHABLE_UNICAST) &&
(!packet->unicast || packet->dest.sin_addr.s_addr != next_hop->address.sin_addr.s_addr))
goto skip;
}else{
// start a new packet buffer.
overlay_init_packet(packet, next_hop->interface, 0);
if(r&REACHABLE_UNICAST){
packet->unicast_subscriber = next_hop;
packet->dest = next_hop->address;
packet->unicast=1;
}
}
}else{
if (packet->buffer){
if (frame->broadcast_sent_via[packet->i])
goto skip;
}else{
// find an interface that we haven't broadcast on yet
int i;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
{
if (overlay_interfaces[i].state==INTERFACE_STATE_UP
&& !frame->broadcast_sent_via[i]){
overlay_init_packet(packet, &overlay_interfaces[i], 0);
break;
}
}
frame->recvaddr = frame->next_hop->address;
// ignore resend logic for unicast packets, where wifi gives better resilience
frame->send_copies=1;
}else
frame->recvaddr = frame->interface->broadcast_address;
if (!packet->buffer){
// huh, we don't need to send it anywhere?
frame = overlay_queue_remove(queue, frame);
continue;
frame->destination_resolved=1;
}else{
if (packet->buffer){
// check if we can stuff into this packet
if (frame->broadcast_sent_via[packet->i])
goto skip;
frame->interface = packet->interface;
frame->recvaddr = packet->interface->broadcast_address;
}else{
// find an interface that we haven't broadcast on yet
frame->interface = NULL;
int i;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
{
if (overlay_interfaces[i].state==INTERFACE_STATE_UP
&& !frame->broadcast_sent_via[i]){
frame->interface = &overlay_interfaces[i];
frame->recvaddr = overlay_interfaces[i].broadcast_address;
break;
}
}
if (!frame->interface){
// huh, we don't need to send it anywhere?
frame = overlay_queue_remove(queue, frame);
continue;
}
}
}
}
if (!packet->buffer){
overlay_init_packet(packet, frame->interface, frame->recvaddr, 0);
if (frame->next_hop && (frame->next_hop->reachable&REACHABLE_UNICAST)){
packet->unicast_subscriber = frame->next_hop;
packet->unicast=1;
}
}else{
// is this packet going our way?
if (frame->interface!=packet->interface || memcmp(&packet->dest, &frame->recvaddr, sizeof(packet->dest))!=0)
goto skip;
}
if (debug&DEBUG_OVERLAYFRAMES){
DEBUGF("Sending payload type %x len %d for %s via %s", frame->type, ob_position(frame->payload),
frame->destination?alloca_tohex_sid(frame->destination->sid):"All",
next_hop?alloca_tohex_sid(next_hop->sid):alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN));
frame->next_hop?alloca_tohex_sid(frame->next_hop->sid):alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN));
}
if (overlay_frame_append_payload(&packet->context, packet->interface, frame, next_hop, packet->buffer))
if (overlay_frame_append_payload(&packet->context, packet->interface, frame, packet->buffer))
// payload was not queued
goto skip;
@ -391,7 +397,7 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
// mark the payload as sent
int keep_payload = 0;
if (next_hop){
if (frame->next_hop){
frame->send_copies --;
if (frame->send_copies>0)
keep_payload=1;
@ -494,7 +500,7 @@ overlay_tick_interface(int i, time_ms_t now) {
// initialise the packet buffer
bzero(&packet, sizeof(struct outgoing_packet));
overlay_init_packet(&packet, &overlay_interfaces[i], 1);
overlay_init_packet(&packet, &overlay_interfaces[i], overlay_interfaces[i].broadcast_address, 1);
/* Stuff more payloads from queues and send it */
overlay_fill_send_packet(&packet, now);

View File

@ -327,7 +327,7 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long
if (!rhizome_db) { RETURN(0); }
int ad_frame_type=ob_get(f->payload);
struct sockaddr_in httpaddr = *(struct sockaddr_in *)f->recvaddr;
struct sockaddr_in httpaddr = f->recvaddr;
httpaddr.sin_port = htons(RHIZOME_HTTP_PORT);
int manifest_length;
rhizome_manifest *m=NULL;

View File

@ -443,7 +443,7 @@ time_ms_t overlay_time_until_next_tick();
int overlay_add_selfannouncement(struct decode_context *context, int interface,struct overlay_buffer *b);
int overlay_frame_append_payload(struct decode_context *context, overlay_interface *interface,
struct overlay_frame *p, struct subscriber *next_hop, struct overlay_buffer *b);
struct overlay_frame *p, struct overlay_buffer *b);
int overlay_packet_init_header(struct decode_context *context, struct overlay_buffer *buff);
int overlay_frame_build_header(struct decode_context *context, struct overlay_buffer *buff,
int queue, int type, int modifiers, int ttl,