Adjust packet format for better duplicate detection

This commit is contained in:
Jeremy Lakeman 2013-05-24 13:52:31 +09:30
parent 0966840f4e
commit 8ab5073869
8 changed files with 168 additions and 106 deletions

View File

@ -65,6 +65,9 @@ struct subscriber{
// result of routing calculations;
int reachable;
// highest seen packet version
int max_packet_version;
// if indirect, who is the next hop?
struct subscriber *next_hop;
@ -96,6 +99,8 @@ struct broadcast{
struct decode_context{
struct overlay_interface *interface;
int sender_interface;
int packet_version;
int encapsulation;
struct sockaddr_in addr;
int invalid_addresses;
struct overlay_frame *please_explain;

View File

@ -42,6 +42,7 @@ struct overlay_frame {
/* What sequence number have we used to send this packet on this interface.
*/
int interface_sent_sequence[OVERLAY_MAX_INTERFACES];
int32_t mdp_sequence;
time_ms_t interface_dont_send_until[OVERLAY_MAX_INTERFACES];
struct broadcast broadcast_id;
@ -57,6 +58,7 @@ struct overlay_frame {
struct sockaddr_in recvaddr;
overlay_interface *interface;
char unicast;
int packet_version;
time_ms_t dont_send_until;
/* Actual payload */

View File

@ -31,15 +31,19 @@ struct sockaddr_in loopback;
#define PACKET_INTERFACE (1<<1)
#define PACKET_SEQ (1<<2)
int overlay_packet_init_header(int encapsulation,
#define SUPPORTED_PACKET_VERSION 1
int overlay_packet_init_header(int packet_version, int encapsulation,
struct decode_context *context, struct overlay_buffer *buff,
struct subscriber *destination,
char unicast, char interface, int seq){
if (packet_version <0 || packet_version > SUPPORTED_PACKET_VERSION)
return WHY("Invalid packet version");
if (encapsulation !=ENCAP_OVERLAY && encapsulation !=ENCAP_SINGLE)
return WHY("Invalid packet encapsulation");
if (ob_append_byte(buff, 0))
if (ob_append_byte(buff, packet_version))
return -1;
if (ob_append_byte(buff, encapsulation))
return -1;
@ -224,18 +228,21 @@ int parseMdpPacketHeader(struct decode_context *context, struct overlay_frame *f
}else
frame->type=OF_TYPE_DATA;
if (flags & PAYLOAD_FLAG_DUPLICATE){
int previous_seq = ob_get(buffer);
if (previous_seq == -1)
RETURN(WHY("Unable to read previous seq"));
if (context->packet_version >0){
int seq = ob_get(buffer);
if (seq == -1)
RETURN(WHY("Unable to read packet seq"));
// TODO unicast
if (link_received_duplicate(context->sender, context->interface, context->sender_interface, previous_seq, 0)){
if (config.debug.overlayframes)
DEBUG("Don't process or forward duplicate payloads");
forward=process=0;
if ((flags & PAYLOAD_FLAG_ONE_HOP) || !(flags & PAYLOAD_FLAG_TO_BROADCAST)){
if (link_received_duplicate(context->sender, context->interface, context->sender_interface, seq, 0)){
if (config.debug.overlayframes)
DEBUG("Don't process or forward duplicate payloads");
forward=process=0;
}
}
}
frame->modifiers=flags;
frame->packet_version = context->packet_version;
// if we can't understand one of the addresses, skip processing the payload
if ((forward||process)&&context->invalid_addresses){
@ -252,6 +259,14 @@ int parseEnvelopeHeader(struct decode_context *context, struct overlay_interface
IN();
time_ms_t now = gettime_ms();
context->packet_version = ob_get(buffer);
if (context->packet_version < 0 || context->packet_version > SUPPORTED_PACKET_VERSION)
RETURN(WHY("Packet version not recognised."));
context->encapsulation = ob_get(buffer);
if (context->encapsulation !=ENCAP_OVERLAY && context->encapsulation !=ENCAP_SINGLE)
RETURN(WHY("Invalid packet encapsulation"));
if (overlay_address_parse(context, buffer, &context->sender))
RETURN(WHY("Unable to parse sender"));
@ -275,7 +290,10 @@ int parseEnvelopeHeader(struct decode_context *context, struct overlay_interface
DEBUG("Completely ignore packets I sent");
RETURN(1);
}
if (context->sender->max_packet_version < context->packet_version)
context->sender->max_packet_version = context->packet_version;
// TODO probe unicast links when we detect an address change.
// if this is a dummy announcement for a node that isn't in our routing table
@ -390,13 +408,6 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
if (config.debug.overlayframes)
DEBUG("Received overlay packet");
if (ob_get(b)!=0)
RETURN(WHY("Packet type not recognised."));
int encapsulation = ob_get(b);
if (encapsulation !=ENCAP_OVERLAY && encapsulation !=ENCAP_SINGLE)
RETURN(WHY("Invalid packet encapsulation"));
int ret=parseEnvelopeHeader(&context, interface, (struct sockaddr_in *)recvaddr, b);
if (ret){
ob_free(b);
@ -414,13 +425,14 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
break;
}
// TODO allow for one byte length
// TODO allow for one byte length?
unsigned int payload_len;
switch (encapsulation){
switch (context.encapsulation){
case ENCAP_SINGLE:
payload_len = ob_remaining(b);
break;
default:
case ENCAP_OVERLAY:
payload_len = ob_get_ui16(b);
if (payload_len > ob_remaining(b)){

View File

@ -23,8 +23,9 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "overlay_buffer.h"
#include "overlay_packet.h"
static int overlay_frame_build_header(struct decode_context *context, struct overlay_buffer *buff,
int queue, int type, int modifiers, int ttl, int previous_seq,
static int overlay_frame_build_header(int packet_version, struct decode_context *context,
struct overlay_buffer *buff,
int queue, int type, int modifiers, int ttl, int sequence,
struct broadcast *broadcast, struct subscriber *next_hop,
struct subscriber *destination, struct subscriber *source)
{
@ -46,8 +47,6 @@ static int overlay_frame_build_header(struct decode_context *context, struct ove
if (type!=OF_TYPE_DATA)
flags |= PAYLOAD_FLAG_LEGACY_TYPE;
if (previous_seq>=0)
flags |= PAYLOAD_FLAG_DUPLICATE;
if (ob_append_byte(buff, flags)) return -1;
@ -74,9 +73,9 @@ static int overlay_frame_build_header(struct decode_context *context, struct ove
if (ob_append_byte(buff, type)) return -1;
}
if (flags & PAYLOAD_FLAG_DUPLICATE){
if (ob_append_byte(buff, previous_seq)) return -1;
}
if (packet_version>0)
if (ob_append_byte(buff, sequence))
return -1;
return 0;
}
@ -104,11 +103,9 @@ int overlay_frame_append_payload(struct decode_context *context, overlay_interfa
if ((!p->destination) && !is_all_matching(p->broadcast_id.id,BROADCAST_LEN,0)){
broadcast = &p->broadcast_id;
}
int i = interface - overlay_interfaces;
int previous_seq = p->destination ? p->interface_sent_sequence[i]:-1;
if (overlay_frame_build_header(context, b,
p->queue, p->type, p->modifiers, p->ttl, previous_seq,
if (overlay_frame_build_header(p->packet_version, context, b,
p->queue, p->type, p->modifiers, p->ttl, p->mdp_sequence&0xFF,
broadcast, p->next_hop,
p->destination, p->source))
goto cleanup;
@ -137,22 +134,22 @@ int single_packet_encapsulation(struct overlay_buffer *b, struct overlay_frame *
if (frame->source_full)
my_subscriber->send_full=1;
int seq = interface->sequence_number++;
if (overlay_packet_init_header(ENCAP_SINGLE, &context, b, NULL, 0, interface_number, seq))
return -1;
if (overlay_packet_init_header(frame->packet_version, ENCAP_SINGLE, &context, b, NULL, 0, interface_number, seq))
return WHY("Failed to init header");
struct broadcast *broadcast=NULL;
if ((!frame->destination) && !is_all_matching(frame->broadcast_id.id,BROADCAST_LEN,0))
broadcast = &frame->broadcast_id;
if (overlay_frame_build_header(&context, b,
if (overlay_frame_build_header(frame->packet_version, &context, b,
frame->queue, frame->type,
frame->modifiers, frame->ttl, -1,
frame->modifiers, frame->ttl, frame->mdp_sequence & 0xFF,
broadcast, frame->next_hop,
frame->destination, frame->source))
return -1;
return WHY("Failed to build header");
if (ob_append_buffer(b, frame->payload))
return -1;
return WHY("Failed to append payload");
return 0;
}

View File

@ -40,6 +40,7 @@ overlay_txqueue overlay_tx[OQ_MAX];
struct outgoing_packet{
overlay_interface *interface;
int32_t seq;
int packet_version;
int i;
struct subscriber *unicast_subscriber;
struct sockaddr_in dest;
@ -48,6 +49,7 @@ struct outgoing_packet{
struct decode_context context;
};
int32_t mdp_sequence=0;
struct sched_ent next_packet;
struct profile_total send_packet;
@ -196,7 +198,7 @@ int overlay_payload_enqueue(struct overlay_frame *p)
if (p->destination){
// allow the packet to be resent
if (p->resend == 0)
p->resend = 3;
p->resend = 1;
}else{
int i;
int interface_copies = 0;
@ -222,7 +224,7 @@ int overlay_payload_enqueue(struct overlay_frame *p)
// allow the packet to be resent
if (p->resend == 0)
p->resend = 3 * interface_copies;
p->resend = 1;
}
}
@ -231,7 +233,10 @@ int overlay_payload_enqueue(struct overlay_frame *p)
p->prev=l;
p->next=NULL;
p->enqueued_at=gettime_ms();
p->mdp_sequence = -1;
// it should be safe to try sending all packets with an mdp sequence
if (p->packet_version==0)
p->packet_version=1;
queue->last=p;
if (!queue->first) queue->first=p;
queue->length++;
@ -243,20 +248,22 @@ int overlay_payload_enqueue(struct overlay_frame *p)
}
static void
overlay_init_packet(struct outgoing_packet *packet, struct subscriber *destination, int unicast,
overlay_init_packet(struct outgoing_packet *packet, struct subscriber *destination,
int unicast, int packet_version,
overlay_interface *interface, struct sockaddr_in addr){
packet->interface = interface;
packet->i = (interface - overlay_interfaces);
packet->dest=addr;
packet->buffer=ob_new();
packet->seq=-1;
packet->packet_version = packet_version;
if (unicast)
packet->unicast_subscriber = destination;
else
packet->seq = interface->sequence_number = (interface->sequence_number + 1)&0xFFFF;
ob_limitsize(packet->buffer, packet->interface->mtu);
overlay_packet_init_header(ENCAP_OVERLAY, &packet->context, packet->buffer,
overlay_packet_init_header(packet_version, ENCAP_OVERLAY, &packet->context, packet->buffer,
destination, unicast, packet->i, packet->seq);
packet->header_length = ob_position(packet->buffer);
}
@ -400,7 +407,11 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
frame->unicast = 1;
}else
frame->recvaddr = frame->interface->broadcast_address;
// degrade packet version if required to reach the destination
if (frame->packet_version > frame->next_hop->max_packet_version)
frame->packet_version = frame->next_hop->max_packet_version;
frame->destination_resolved=1;
}else{
@ -455,29 +466,17 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
if (limit_is_allowed(&frame->interface->transfer_limit))
goto skip;
if (frame->interface->encapsulation==ENCAP_SINGLE){
// send MDP packets without aggregating them together
struct overlay_buffer *buff = ob_new();
int ret=single_packet_encapsulation(buff, frame);
if (!ret){
ret=overlay_broadcast_ensemble(frame->interface, &frame->recvaddr, ob_ptr(buff), ob_position(buff));
}
ob_free(buff);
if (ret)
goto skip;
goto sent;
}
if (frame->source_full)
my_subscriber->send_full=1;
overlay_init_packet(packet, frame->next_hop, frame->unicast, frame->interface, frame->recvaddr);
if (frame->interface->encapsulation!=ENCAP_SINGLE)
overlay_init_packet(packet, frame->next_hop, frame->unicast, frame->packet_version, frame->interface, frame->recvaddr);
}else{
// is this packet going our way?
if (frame->interface!=packet->interface || memcmp(&packet->dest, &frame->recvaddr, sizeof(packet->dest))!=0){
if (frame->interface!=packet->interface ||
frame->packet_version==packet->packet_version ||
memcmp(&packet->dest, &frame->recvaddr, sizeof(packet->dest))!=0){
goto skip;
}
}
@ -491,21 +490,45 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
}
}
if (overlay_frame_append_payload(&packet->context, packet->interface, frame, packet->buffer)){
// payload was not queued
goto skip;
if (frame->mdp_sequence == -1){
frame->mdp_sequence = mdp_sequence = (mdp_sequence+1)&0xFFFF;
}else if(((mdp_sequence - frame->mdp_sequence)&0xFFFF) >= 64){
// too late, we've sent too many packets for the next hop to correctly de-duplicate
if (config.debug.overlayframes)
DEBUGF("Retransmition of frame %p mdp seq %d, is too late to be de-duplicated", frame, frame->mdp_sequence, frame->interface_sent_sequence[packet->i], packet->seq);
frame = overlay_queue_remove(queue, frame);
continue;
}else{
if (config.debug.overlayframes)
DEBUGF("Retransmitting frame %p mdp seq %d, from packet seq %d in seq %d", frame, frame->mdp_sequence, frame->interface_sent_sequence[packet->i], packet->seq);
}
if (frame->interface_sent_sequence[packet->i]>=0 && config.debug.overlayframes)
DEBUGF("Retransmitted frame %p from seq %d in seq %d", frame, frame->interface_sent_sequence[packet->i], packet->seq);
frame->interface_sent_sequence[packet->i] = packet->seq;
if (frame->interface->encapsulation==ENCAP_SINGLE){
// send MDP packets without aggregating them together
struct overlay_buffer *buff = ob_new();
sent:
int ret=single_packet_encapsulation(buff, frame);
if (!ret){
ret=overlay_broadcast_ensemble(frame->interface, &frame->recvaddr, ob_ptr(buff), ob_position(buff));
}
ob_free(buff);
if (ret)
goto skip;
}else{
if (overlay_frame_append_payload(&packet->context, packet->interface, frame, packet->buffer)){
// payload was not queued
goto skip;
}
}
frame->interface_sent_sequence[packet->i] = packet->seq;
frame->interface_dont_send_until[packet->i] = now+200;
if (config.debug.overlayframes){
DEBUGF("Sent payload %p type %x len %d for %s via %s, seq %d",
frame,
DEBUGF("Sent payload %p, %d type %x len %d for %s via %s, seq %d",
frame, frame->mdp_sequence,
frame->type, ob_position(frame->payload),
frame->destination?alloca_tohex_sid(frame->destination->sid):"All",
frame->next_hop?alloca_tohex_sid(frame->next_hop->sid):alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN),
@ -519,9 +542,8 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
// mark the payload as sent
frame->resend --;
if (frame->destination_resolved){
if (frame->resend>0 && frame->next_hop && packet->seq!=-1 && (!frame->unicast)){
if (frame->resend>0 && frame->packet_version>=1 && frame->next_hop && packet->seq !=-1 && (!frame->unicast)){
frame->dont_send_until = now+200;
frame->destination_resolved = 0;
if (config.debug.overlayframes)
@ -529,7 +551,7 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
goto skip;
}
}else{
if (frame->resend<=0 || packet->seq==-1 || frame->unicast){
if (frame->resend<=0 || frame->packet_version<1 || packet->seq==-1 || frame->unicast){
// dont retransmit if we aren't sending sequence numbers, or we've run out of allowed resends
frame->interface_sent_sequence[packet->i] = FRAME_DONT_SEND;
}
@ -598,7 +620,7 @@ int overlay_send_tick_packet(struct overlay_interface *interface){
struct outgoing_packet packet;
bzero(&packet, sizeof(struct outgoing_packet));
packet.seq=-1;
overlay_init_packet(&packet, NULL, 0, interface, interface->broadcast_address);
overlay_init_packet(&packet, NULL, 0, 0, interface, interface->broadcast_address);
overlay_fill_send_packet(&packet, gettime_ms());
return 0;
@ -646,8 +668,6 @@ int overlay_queue_ack(struct subscriber *neighbour, struct overlay_interface *in
if (config.debug.overlayframes)
DEBUGF("Requeue packet %p to %s sent by seq %d due to ack of seq %d", frame, alloca_tohex_sid(neighbour->sid), frame_seq, ack_seq);
frame->dont_send_until = now;
// dont count the next retransmission against the time based retries
frame->resend ++;
overlay_calc_queue_time(&overlay_tx[i], frame);
}
}

View File

@ -69,6 +69,7 @@ struct neighbour_link{
time_ms_t link_timeout;
char unicast;
int ack_sequence;
uint64_t ack_mask;
};
@ -87,6 +88,9 @@ struct neighbour{
// otherwise we don't care too much about packet loss.
char using_us;
int mdp_ack_sequence;
uint64_t mdp_ack_mask;
// next link update
time_ms_t next_neighbour_update;
time_ms_t last_update;
@ -162,6 +166,7 @@ static struct neighbour *get_neighbour(struct subscriber *subscriber, char creat
n->subscriber = subscriber;
n->_next = neighbours;
n->last_update_seq = -1;
n->mdp_ack_sequence = -1;
// TODO measure min/max rtt
n->rtt = 120;
neighbours = n;
@ -754,29 +759,31 @@ int link_state_ack_soon(struct subscriber *subscriber){
}
// our neighbour is sending a duplicate frame, did we see the original?
int link_received_duplicate(struct subscriber *subscriber, struct overlay_interface *interface, int sender_interface, int previous_seq, int unicast)
int link_received_duplicate(struct subscriber *subscriber, struct overlay_interface *interface, int sender_interface, int payload_seq, int unicast)
{
// TODO better handling of unicast routes
if (unicast)
return 0;
struct neighbour *neighbour = get_neighbour(subscriber, 0);
if (!neighbour)
return 0;
struct neighbour_link *link=get_neighbour_link(neighbour, interface, sender_interface, unicast);
if (neighbour->mdp_ack_sequence != -1){
if (neighbour->mdp_ack_sequence == payload_seq){
return 1;
}
int offset = (link->ack_sequence - 1 - previous_seq)&0xFF;
if (offset >= 64 || (link->ack_mask & (1<<offset))){
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("LINK STATE; dropping duplicate %s, saw previous seq %d",
alloca_tohex_sid(subscriber->sid), previous_seq);
return 1;
}
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("LINK STATE; allowing duplicate %s, didn't see seq %d",
alloca_tohex_sid(subscriber->sid), previous_seq);
int offset = (neighbour->mdp_ack_sequence - 1 - payload_seq)&0xFF;
if (offset < 64){
if (neighbour->mdp_ack_mask & (1<<offset)){
return 1;
}
neighbour->mdp_ack_mask |= (1<<offset);
}else{
int offset = (payload_seq - neighbour->mdp_ack_sequence - 1)&0xFF;
neighbour->mdp_ack_mask = (neighbour->mdp_ack_mask << 1) | 1;
neighbour->mdp_ack_mask = neighbour->mdp_ack_mask << offset;
neighbour->mdp_ack_sequence = payload_seq;
}
}else
neighbour->mdp_ack_sequence = payload_seq;
return 0;
}

View File

@ -514,7 +514,7 @@ time_ms_t overlay_time_until_next_tick();
int overlay_frame_append_payload(struct decode_context *context, overlay_interface *interface,
struct overlay_frame *p, struct overlay_buffer *b);
int single_packet_encapsulation(struct overlay_buffer *b, struct overlay_frame *frame);
int overlay_packet_init_header(int encapsulation,
int overlay_packet_init_header(int packet_version, int encapsulation,
struct decode_context *context, struct overlay_buffer *buff,
struct subscriber *destination,
char unicast, char interface, int seq);

View File

@ -47,7 +47,7 @@ path_exists() {
set_instance $next_inst
executeOk_servald route print
tfw_log "Looking for link from $next_inst to $I"
if ! grep "^${!sidvar}:.*BROADCAST.*:dummy.*:0*\$" $_tfw_tmp/stdout; then
if ! grep "^${!sidvar}:\(BROADCAST \|UNICAST \)\{1,\}:dummy.*:0*\$" $_tfw_tmp/stdout; then
tfw_log "Link not found"
return 1
fi
@ -69,13 +69,13 @@ start_routing_instance() {
set server.interface_path "$SERVALD_VAR" \
set monitor.socket "org.servalproject.servald.monitor.socket.$TFWUNIQUE.$instance_name" \
set mdp.socket "org.servalproject.servald.mdp.socket.$TFWUNIQUE.$instance_name" \
set log.console.level debug \
set log.console.show_pid on \
set log.console.show_time on \
set debug.mdprequests yes \
set debug.linkstate yes \
set debug.verbose yes \
set debug.overlayrouting yes \
set log.console.level debug \
set log.console.show_pid on \
set log.console.show_time on \
set rhizome.enable no
start_servald_server
wait_until interface_up
@ -372,16 +372,37 @@ setup_ping_unreliable() {
foreach_instance +A +B add_interface 1
foreach_instance +A +B \
executeOk_servald config \
set interfaces.1.drop_broadcasts 20 \
set debug.overlayframes 1 \
set debug.rejecteddata 1
set interfaces.1.drop_broadcasts 40
foreach_instance +A +B start_routing_instance
}
test_ping_unreliable() {
wait_until path_exists +A +B
wait_until path_exists +B +A
set_instance +A
executeOk_servald mdp ping --interval=0.020 --timeout=2 $SIDB 500
executeOk_servald mdp ping --interval=0.020 --timeout=3 $SIDB 500
tfw_cat --stdout --stderr
}
doc_ping_unreliable8="Ping over a 2-hop unreliable link"
setup_ping_unreliable8() {
setup_servald
assert_no_servald_processes
foreach_instance +A +B +C create_single_identity
foreach_instance +A +B add_interface 1
foreach_instance +A +B \
executeOk_servald config \
set interfaces.1.drop_broadcasts 20
foreach_instance +B +C add_interface 2
foreach_instance +B +C \
executeOk_servald config \
set interfaces.2.drop_broadcasts 20
foreach_instance +A +B +C start_routing_instance
}
test_ping_unreliable8() {
wait_until path_exists +A +B +C
wait_until path_exists +C +B +A
set_instance +A
executeOk_servald mdp ping --interval=0.020 --timeout=3 $SIDC 500
tfw_cat --stdout --stderr
}
@ -393,9 +414,7 @@ setup_brping_unreliable() {
foreach_instance +A +B add_interface 1
foreach_instance +A +B \
executeOk_servald config \
set interfaces.1.drop_broadcasts 20 \
set debug.overlayframes 1 \
set debug.rejecteddata 1
set interfaces.1.drop_broadcasts 20
foreach_instance +A +B start_routing_instance
}
test_brping_unreliable() {