Allow for different routes per packet destination

This commit is contained in:
Jeremy Lakeman 2014-09-23 13:47:48 +09:30
parent c7e397cadf
commit 4440cf3af0
6 changed files with 46 additions and 35 deletions

View File

@ -190,10 +190,12 @@ int overlay_send_probe(struct subscriber *peer, struct network_destination *dest
bzero(frame,sizeof(struct overlay_frame));
frame->type=OF_TYPE_DATA;
frame->source = my_subscriber;
frame->next_hop = frame->destination = peer;
frame->destination = peer;
frame->ttl=1;
frame->queue=queue;
frame->destinations[frame->destination_count++].destination=add_destination_ref(destination);
unsigned dest = frame->destination_count++;
frame->destinations[dest].destination=add_destination_ref(destination);
frame->destinations[dest].next_hop = peer;
if ((frame->payload = ob_new()) == NULL) {
op_free(frame);
return -1;

View File

@ -35,6 +35,8 @@ struct packet_destination{
time_ms_t transmit_time;
// the actual out going stream for this packet
struct network_destination *destination;
// next hop in the route
struct subscriber *next_hop;
};
struct overlay_frame {

View File

@ -78,8 +78,8 @@ static int overlay_frame_build_header(int packet_version, struct decode_context
}
int overlay_frame_append_payload(struct decode_context *context, int encapsulation,
struct overlay_frame *p, struct overlay_buffer *b,
char will_retransmit)
struct overlay_frame *p, struct subscriber *next_hop,
struct overlay_buffer *b, char will_retransmit)
{
/* Convert a payload (frame) structure into a series of bytes.
Assumes that any encryption etc has already been done.
@ -96,7 +96,7 @@ int overlay_frame_append_payload(struct decode_context *context, int encapsulati
if (overlay_frame_build_header(p->packet_version, context, b,
p->queue, p->type, p->modifiers, will_retransmit,
p->ttl, p->mdp_sequence&0xFF,
broadcast, p->next_hop,
broadcast, next_hop,
p->destination, p->source) == -1)
goto cleanup;

View File

@ -373,10 +373,6 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
frame->destinations[i].destination->unicast?"unicast":"broadcast",
frame->destinations[i].destination->interface->name);
}
// degrade packet version if required to reach the destination
if (frame->packet_version > frame->next_hop->max_packet_version)
frame->packet_version = frame->next_hop->max_packet_version;
}
if (frame->mdp_sequence == -1){
@ -404,6 +400,10 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
remove_destination(frame, i);
continue;
}
// degrade packet version if required to reach the destination
if (frame->destinations[i].next_hop
&& frame->packet_version > frame->destinations[i].next_hop->max_packet_version)
frame->packet_version = frame->destinations[i].next_hop->max_packet_version;
if (frame->destinations[i].transmit_time &&
frame->destinations[i].transmit_time + frame->destinations[i].destination->resend_delay > now)
@ -466,28 +466,29 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
if (frame->packet_version<1 || frame->resend<=0 || packet->seq==-1)
will_retransmit=0;
if (overlay_frame_append_payload(&packet->context, packet->destination->encapsulation, frame, packet->buffer, will_retransmit)){
if (overlay_frame_append_payload(&packet->context, packet->destination->encapsulation, frame,
frame->destinations[destination_index].next_hop, packet->buffer, will_retransmit)){
// payload was not queued, delay the next attempt slightly
frame->delay_until = now + 5;
goto skip;
}
frame->transmit_count++;
{
struct packet_destination *dest = &frame->destinations[destination_index];
dest->sent_sequence = dest->destination->sequence_number;
dest->transmit_time = now;
if (debug)
strbuf_sprintf(debug, "%d(%s), ", frame->mdp_sequence, frame->whence.function);
if (config.debug.overlayframes)
DEBUGF("Appended payload %p, %d type %x len %zd for %s via %s",
frame, frame->mdp_sequence,
frame->type, ob_position(frame->payload),
frame->destination?alloca_tohex_sid_t(frame->destination->sid):"All",
dest->next_hop?alloca_tohex_sid_t(dest->next_hop->sid):alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN));
}
frame->transmit_count++;
if (debug)
strbuf_sprintf(debug, "%d(%s), ", frame->mdp_sequence, frame->whence.function);
if (config.debug.overlayframes){
DEBUGF("Appended payload %p, %d type %x len %zd for %s via %s",
frame, frame->mdp_sequence,
frame->type, ob_position(frame->payload),
frame->destination?alloca_tohex_sid_t(frame->destination->sid):"All",
frame->next_hop?alloca_tohex_sid_t(frame->next_hop->sid):alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN));
}
// dont retransmit if we aren't sending sequence numbers, or we've been asked not to
if (!will_retransmit){
@ -588,7 +589,7 @@ int overlay_queue_ack(struct subscriber *neighbour, struct network_destination *
if (j>=0){
int frame_seq = frame->destinations[j].sent_sequence;
if (frame_seq >=0 && (frame->next_hop == neighbour || !frame->destination)){
if (frame_seq >=0 && (frame->destinations[j].next_hop == neighbour || !frame->destination)){
int seq_delta = (ack_seq - frame_seq)&0xFF;
char acked = (seq_delta==0 || (seq_delta <= 32 && ack_mask&(1<<(seq_delta-1))))?1:0;

View File

@ -1027,17 +1027,17 @@ struct link_in * get_neighbour_link(struct neighbour *neighbour, struct overlay_
int link_add_destinations(struct overlay_frame *frame)
{
if (frame->destination){
frame->next_hop = frame->destination;
struct subscriber *next_hop = frame->destination;
// if the destination is unreachable, but we have a reachable directory service
// forward it through the directory service
if (frame->next_hop->reachable==REACHABLE_NONE
if (next_hop->reachable==REACHABLE_NONE
&& directory_service
&& frame->next_hop!=directory_service
&& next_hop!=directory_service
&& directory_service->reachable&REACHABLE)
frame->next_hop = directory_service;
next_hop = directory_service;
if (frame->next_hop->reachable==REACHABLE_NONE){
if (next_hop->reachable==REACHABLE_NONE){
// if the destination is a network neighbour, but we haven't established any viable route yet
// we need to add all likely links so that we can send ack's and bootstrap the routing table
struct neighbour *n = get_neighbour(frame->destination, 0);
@ -1045,19 +1045,25 @@ int link_add_destinations(struct overlay_frame *frame)
struct link_out *out = n->out_links;
time_ms_t now = gettime_ms();
while(out){
if (out->timeout>=now && frame->destination_count < MAX_PACKET_DESTINATIONS)
frame->destinations[frame->destination_count++].destination = add_destination_ref(out->destination);
if (out->timeout>=now && frame->destination_count < MAX_PACKET_DESTINATIONS){
unsigned dest = frame->destination_count++;
frame->destinations[dest].destination = add_destination_ref(out->destination);
frame->destinations[dest].next_hop = next_hop;
}
out = out->_next;
}
}
}
if ((frame->next_hop->reachable&REACHABLE)==REACHABLE_INDIRECT)
frame->next_hop = frame->next_hop->next_hop;
if ((next_hop->reachable&REACHABLE)==REACHABLE_INDIRECT)
next_hop = next_hop->next_hop;
if (frame->next_hop->reachable&REACHABLE_DIRECT){
if (frame->destination_count < MAX_PACKET_DESTINATIONS)
frame->destinations[frame->destination_count++].destination=add_destination_ref(frame->next_hop->destination);
if (next_hop->reachable&REACHABLE_DIRECT){
if (frame->destination_count < MAX_PACKET_DESTINATIONS){
unsigned dest = frame->destination_count++;
frame->destinations[dest].destination=add_destination_ref(next_hop->destination);
frame->destinations[dest].next_hop = next_hop;
}
}
}else{
char added_interface[OVERLAY_MAX_INTERFACES];

View File

@ -188,8 +188,8 @@ int process_incoming_frame(time_ms_t now, struct overlay_interface *interface,
int overlay_frame_process(struct overlay_interface *interface, struct overlay_frame *f);
int overlay_frame_append_payload(struct decode_context *context, int encapsulation,
struct overlay_frame *p, struct overlay_buffer *b,
char will_retransmit);
struct overlay_frame *p, struct subscriber *next_hop,
struct overlay_buffer *b, char will_retransmit);
int overlay_packet_init_header(int packet_version, int encapsulation,
struct decode_context *context, struct overlay_buffer *buff,
char unicast, char interface, int seq);