diff --git a/network_coding.c b/network_coding.c index 0e23b950..394db53a 100644 --- a/network_coding.c +++ b/network_coding.c @@ -296,14 +296,19 @@ int nc_tx_produce_packet(struct nc *n, uint8_t *datagram, uint32_t buffer_size) static int _nc_dump_half(struct nc_half *n) { DEBUGF(" window start; %d", n->window_start); + DEBUGF(" window size; %d", n->window_size); + DEBUGF(" last_ack; %d", n->last_ack); + DEBUGF(" unseen; %d", n->unseen); DEBUGF(" queue size; %d", n->queue_size); DEBUGF(" max queue size; %d", n->max_queue_size); + DEBUGF(" delivered; %d", n->deliver_next); int i; for (i=0;i<n->max_queue_size;i++){ if (!n->packets[i].payload) continue; DEBUGF(" %02d: 0x%02x, 0x%08x", i, n->packets[i].sequence, n->packets[i].combination); + dump("packet", n->packets[i].payload, n->packets[i].len); } return 0; } @@ -339,12 +344,19 @@ static int _nc_rx_combine_packet(struct nc_half *n, struct nc_packet *packet) packet->sequence += shift; packet->combination <<= shift; + if (_compare_uint8(packet->sequence, n->window_start)<0){ + // um, no. I'm not storing an old packet that I've just delivered. + // we don't reject these packets earlier, firstly because they shouldn't happen + // secondly because we might be able to learn something based on + // packets that haven't been delivered yet + free(packet->payload); + return 1; + } + int index = packet->sequence & (n->max_queue_size -1); if (n->packets[index].payload){ _nc_dump_half(n); - free(packet->payload); - FATALF("Attempted to replace RX payload %d (%d) with %d without freeing it first",index,n->packets[index].sequence, packet->sequence); - return 1; + FATALF("Attempted to replace RX payload %d (%d) with %d [%08x] without freeing it first",index,n->packets[index].sequence, packet->sequence, packet->combination); } // reduce other stored packets @@ -426,18 +438,19 @@ int nc_rx_next_delivered(struct nc *n, uint8_t *payload, int buffer_size) int index = n->rx.deliver_next & (n->rx.max_queue_size -1); if (!n->rx.packets[index].payload || + n->rx.packets[index].sequence!=n->rx.deliver_next || n->rx.packets[index].combination != 0x80000000) return 0; - bcopy(n->rx.packets[index].payload, payload, n->rx.datagram_size); - n->rx.deliver_next++; + bcopy(n->rx.packets[index].payload, payload, n->rx.packets[index].len); // drop the payload if the sender has already advanced - if (_compare_uint8(n->rx.deliver_next, n->rx.window_start) <= 0){ + if (_compare_uint8(n->rx.deliver_next, n->rx.window_start) < 0){ free(n->rx.packets[index].payload); n->rx.packets[index].payload=NULL; n->rx.queue_size--; } - return n->rx.datagram_size; + n->rx.deliver_next++; + return n->rx.packets[index].len; } #ifdef RUNTESTS @@ -459,7 +472,6 @@ static void _nc_dump(struct nc *n) fprintf(stderr, "TX\n"); _nc_dump_half(&n->tx); fprintf(stderr, "RX\n"); - fprintf(stderr, " delivered; %d\n", n->rx.deliver_next); _nc_dump_half(&n->rx); } diff --git a/overlay_interface.c b/overlay_interface.c index 48e96c91..3c3bcea8 100644 --- a/overlay_interface.c +++ b/overlay_interface.c @@ -410,7 +410,8 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr interface->ctsrts = ifconfig->ctsrts; set_destination_ref(&interface->destination, NULL); interface->destination = new_destination(interface, ifconfig->encapsulation); - + interface->reliable = 0; + /* Pick a reasonable default MTU. This will ultimately get tuned by the bandwidth and other properties of the interface */ interface->mtu = 1200; @@ -531,6 +532,7 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr switch (ifconfig->socket_type) { case SOCK_STREAM: radio_link_init(interface); + interface->reliable = 1; interface->alarm.poll.events=POLLIN|POLLOUT; watch(&interface->alarm); diff --git a/overlay_queue.c b/overlay_queue.c index b239b79a..e91eb9d5 100644 --- a/overlay_queue.c +++ b/overlay_queue.c @@ -452,7 +452,8 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim } char will_retransmit=1; - if (frame->packet_version<1 || frame->resend<=0 || packet->seq==-1) + if (packet->destination->interface->reliable || + frame->packet_version<1 || frame->resend<=0 || packet->seq==-1) will_retransmit=0; if (overlay_frame_append_payload(&packet->context, packet->destination->encapsulation, frame, packet->buffer, will_retransmit)){ diff --git a/radio_link.c b/radio_link.c index 620f9a65..d88c3918 100644 --- a/radio_link.c +++ b/radio_link.c @@ -191,7 +191,7 @@ static int encode_next_packet(struct radio_link_state *link_state) ob_get_bytes(link_state->tx_packet, &next_packet[1], count); if (nc_tx_enqueue_datagram(link_state->network_coding, next_packet, LINK_PAYLOAD_MTU)==0){ if (config.debug.radio_link) - DEBUGF("Enqueued fragment len %d", count+1); + DEBUGF("Enqueued fragment len %d of %d", count+1, ob_limit(link_state->tx_packet)); }else{ ob_rewind(link_state->tx_packet); break; diff --git a/serval.h b/serval.h index d97115f2..4c2e9970 100644 --- a/serval.h +++ b/serval.h @@ -355,6 +355,8 @@ typedef struct overlay_interface { int socket_type; char send_broadcasts; char prefer_unicast; + char reliable; + /* Not necessarily the real MTU, but the largest frame size we are willing to TX. For radio links the actual maximum and the maximum that is likely to be delivered reliably are potentially two quite different values. */