From 76267904d96ab56a40c3b1b4e7edc5a14b08a2b4 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Mon, 25 Nov 2013 15:06:05 +1030 Subject: [PATCH] Start using network coding over the radio link --- commandline.c | 3 +- fakeradio.c | 16 +-- network_coding.c | 215 +++++++++++++++++++++------------------- network_coding.h | 15 +++ radio_link.c | 225 +++++++++++++++++++++++++----------------- tests/rhizomeprotocol | 1 - tests/routing | 6 +- 7 files changed, 274 insertions(+), 207 deletions(-) diff --git a/commandline.c b/commandline.c index e579d982..f3720741 100644 --- a/commandline.c +++ b/commandline.c @@ -980,8 +980,7 @@ int app_mdp_ping(const struct cli_parsed *parsed, struct cli_context *context) int r = mdp_send(mdp_sockfd, &mdp_header, payload, sizeof(payload)); if (r<0) WHY_perror("mdp_send"); - else - tx_count++; + tx_count++; } /* Now look for replies until one second has passed, and print any replies diff --git a/fakeradio.c b/fakeradio.c index 2ed5441f..4cccd3ad 100644 --- a/fakeradio.c +++ b/fakeradio.c @@ -193,14 +193,14 @@ int write_bytes(struct radio_state *s) wrote=8; if (s->last_char_ms) wrote = write(s->fd, s->rxbuffer, wrote); - if (wrote>0){ - log_time(); - fprintf(stderr, "Wrote to %s\n", s->name); - dump(NULL, s->rxbuffer, wrote); - if (wrote < s->rxb_len) - bcopy(&s->rxbuffer[wrote], s->rxbuffer, s->rxb_len - wrote); - s->rxb_len -= wrote; - } + if (wrote<=0) + abort(); + log_time(); + fprintf(stderr, "Wrote to %s\n", s->name); + dump(NULL, s->rxbuffer, wrote); + if (wrote < s->rxb_len) + bcopy(&s->rxbuffer[wrote], s->rxbuffer, s->rxb_len - wrote); + s->rxb_len -= wrote; return wrote; } diff --git a/network_coding.c b/network_coding.c index bdd371b9..0e23b950 100644 --- a/network_coding.c +++ b/network_coding.c @@ -26,21 +26,25 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include #include "network_coding.h" #include "dataformats.h" +#include "log.h" +#include "mem.h" #define FLAG_NEW 1 - - struct nc_packet{ uint8_t sequence; uint32_t combination; + size_t len; uint8_t flags; uint8_t *payload; }; struct nc_half { + uint8_t last_ack; + // Define the size parameters of the network coding data structures uint8_t window_size; // limited to the number of bits we can fit in a uint32_t uint8_t window_start; // sequence of first datagram in sending window + uint8_t unseen; // sequence of first unseen packet size_t datagram_size; // number of bytes in each fixed sized unit uint8_t deliver_next; // sequence of next packet that should be delivered // dynamically sized array of pointers to packet buffers @@ -51,6 +55,7 @@ struct nc_half { // At the receiver, this should be 2*maximum window size to allow for older packets we can't decode yet uint8_t max_queue_size; uint8_t queue_size; // # of packets currently in the array + int count_new; // number of un-sent degrees of freedom }; struct nc{ @@ -115,24 +120,24 @@ int nc_tx_has_room(struct nc *n) int nc_tx_enqueue_datagram(struct nc *n, unsigned char *d, size_t len) { - if (len!=n->tx.datagram_size){ - fprintf(stderr, "Invalid length %zd (%zd)\n", len, n->tx.datagram_size); - return -1; - } + if (len==0 || len>n->tx.datagram_size) + return WHYF("Invalid length %zd (%zd)", len, n->tx.datagram_size); if (!nc_tx_has_room(n)) return 1; // Add datagram to queue uint8_t seq = n->tx.window_start + n->tx.queue_size; int index = seq & (n->tx.max_queue_size -1); - if (n->tx.packets[index].payload){ - fprintf(stderr, "Attempted to replace TX payload %d (%d) with %d without freeing it first\n",index,n->tx.packets[index].sequence, seq); - exit(-1); - } - n->tx.packets[index].payload = malloc(len); + if (n->tx.packets[index].payload) + FATALF("Attempted to replace TX payload %d (%d) with %d without freeing it first",index,n->tx.packets[index].sequence, seq); + n->tx.packets[index].payload = emalloc(len); + if (!n->tx.packets[index].payload) + return 1; n->tx.packets[index].sequence = seq; n->tx.packets[index].combination = 0x80000000; n->tx.packets[index].flags = FLAG_NEW; + n->tx.count_new++; + n->tx.packets[index].len = len; bcopy(d, n->tx.packets[index].payload, len); n->tx.queue_size++; return 0; @@ -147,23 +152,6 @@ static int _compare_uint8(uint8_t one, uint8_t two) return -1; } -static int _nc_get_ack(struct nc_half *n, uint8_t *first_unseen, uint8_t *window_size) -{ - uint8_t seq; - - for (seq = n->window_start; ;seq++){ - int index = seq & (n->max_queue_size -1); - if (n->packets[index].sequence != seq || !n->packets[index].payload) - break; - } - *window_size = 32 - (seq - n->deliver_next); - if (*window_size > n->max_queue_size) - *window_size = n->max_queue_size; - *first_unseen = seq; - - return 0; -} - static int _nc_ack(struct nc_half *n, uint8_t first_unseen, uint8_t window_size) { if (window_size>n->max_queue_size) @@ -200,12 +188,13 @@ static uint32_t _combine_masks(const struct nc_packet *src, const struct nc_pack return dst->combination ^ mask; } -static void _combine_packets(const struct nc_packet *src, struct nc_packet *dst, size_t datagram_size) +static void _combine_packets(const struct nc_packet *src, struct nc_packet *dst) { - // TODO verify that this combination mask is set correctly. + if (dst->len < src->len) + FATALF("Expected the destination buffer to be at least %zu", src->len); dst->combination = _combine_masks(src, dst); size_t i; - for(i=0;ilen; i++) dst->payload[i]^=src->payload[i]; } @@ -234,53 +223,94 @@ static int _nc_tx_combine_random_payloads(struct nc_half *n, struct nc_packet *p if (n->packets[index].flags & FLAG_NEW){ if (added_new) continue; - _combine_packets(&n->packets[index], packet, n->datagram_size); + _combine_packets(&n->packets[index], packet); added_new = 1; + n->count_new --; n->packets[index].flags =0; continue; } if (combination&1) - _combine_packets(&n->packets[index], packet, n->datagram_size); + _combine_packets(&n->packets[index], packet); combination>>=1; } return 0; } +int nc_tx_packet_urgency(struct nc *n) +{ + // new data? send asap + if (n->tx.count_new) + return URGENCY_ASAP; + // ack required? send soon + if (n->rx.unseen != n->rx.last_ack) + return URGENCY_ASAP; + // no new data at either end? don't care + if (!n->tx.queue_size && !n->rx.queue_size) + return URGENCY_IDLE; + // send soon-ish + return URGENCY_SOON; +} + // construct a packet and return the payload size int nc_tx_produce_packet(struct nc *n, uint8_t *datagram, uint32_t buffer_size) { // TODO: Don't waste more bytes than we need to on the bitmap and sequence number if (buffer_size < n->tx.datagram_size+NC_HEADER_LEN) return -1; - - if (_nc_get_ack(&n->rx, &datagram[0], &datagram[1])) - return -1; + + uint8_t window_size = 32 - (n->rx.unseen - n->rx.deliver_next); + if (window_size > n->rx.max_queue_size) + window_size = n->rx.max_queue_size; - if (!n->tx.queue_size){ - // No data to send, just send an ack - // TODO don't ack too often - return 2; + datagram[0]=n->rx.unseen; + datagram[1]=window_size; + n->rx.last_ack = n->rx.unseen; + size_t len=2; + + if (n->tx.queue_size){ + // Produce linear combination + struct nc_packet packet={ + .sequence = n->tx.window_start, + .combination = 0, + .payload = &datagram[NC_HEADER_LEN], + .len = buffer_size - NC_HEADER_LEN, + }; + bzero(packet.payload, packet.len); + + if (_nc_tx_combine_random_payloads(&n->tx, &packet)) + return -1; + // TODO assert actual_combination? (should never be zero) + // Write out bitmap of actual combinations involved + datagram[2] = packet.sequence; + write_uint32(&datagram[3], packet.combination); + len = packet.len + NC_HEADER_LEN; + // truncate zero bytes from the end + while(!datagram[len-1]) + len--; } - - // Produce linear combination - struct nc_packet packet={ - .sequence = n->tx.window_start, - .combination = 0, - .payload = &datagram[NC_HEADER_LEN], - }; - - if (_nc_tx_combine_random_payloads(&n->tx, &packet)) - return -1; - - // TODO assert actual_combination? (should never be zero) - // Write out bitmap of actual combinations involved - datagram[2] = packet.sequence; - write_uint32(&datagram[3], packet.combination); - return NC_HEADER_LEN+n->tx.datagram_size; + return len; } +static int _nc_dump_half(struct nc_half *n) +{ + DEBUGF(" window start; %d", n->window_start); + DEBUGF(" queue size; %d", n->queue_size); + DEBUGF(" max queue size; %d", n->max_queue_size); + int i; + for (i=0;imax_queue_size;i++){ + if (!n->packets[i].payload) + continue; + DEBUGF(" %02d: 0x%02x, 0x%08x", + i, n->packets[i].sequence, n->packets[i].combination); + } + return 0; +} + + +// note, the incoming packet buffer must be allocated from the heap +// this function will take responsibility for releasing it static int _nc_rx_combine_packet(struct nc_half *n, struct nc_packet *packet) { int i; @@ -296,12 +326,12 @@ static int _nc_rx_combine_packet(struct nc_half *n, struct nc_packet *packet) // rx packet doesn't add any new information if (new_mask==0){ + free(packet->payload); return 1; } - if (new_mask < packet->combination){ - _combine_packets(&n->packets[i], packet, n->datagram_size); - } + if (new_mask < packet->combination) + _combine_packets(&n->packets[i], packet); } // the new packet must contain new information that will cause a new packet to be seen. @@ -311,15 +341,12 @@ static int _nc_rx_combine_packet(struct nc_half *n, struct nc_packet *packet) int index = packet->sequence & (n->max_queue_size -1); if (n->packets[index].payload){ - fprintf(stderr, "Attempted to replace RX payload %d (%d) with %d without freeing it first\n",index,n->packets[index].sequence, packet->sequence); - exit(-1); + _nc_dump_half(n); + free(packet->payload); + FATALF("Attempted to replace RX payload %d (%d) with %d without freeing it first",index,n->packets[index].sequence, packet->sequence); + return 1; } - // try to duplicate the payload first, we don't want to reduce existing packets if this fails. - unsigned char *dup_payload = malloc(n->datagram_size); - if (!dup_payload) - return -1; - bcopy(packet->payload, dup_payload, n->datagram_size); // reduce other stored packets for (i=0;imax_queue_size;i++){ if (!n->packets[i].payload || _compare_uint8(n->packets[i].sequence, packet->sequence) > 0) @@ -328,15 +355,23 @@ static int _nc_rx_combine_packet(struct nc_half *n, struct nc_packet *packet) // n->packets[i].sequence, n->packets[i].combination, // packet->sequence, packet->combination); uint32_t new_mask = _combine_masks(packet, &n->packets[i]); - if (new_mask < n->packets[i].combination){ - _combine_packets(packet, &n->packets[i], n->datagram_size); - } + if (new_mask < n->packets[i].combination) + _combine_packets(packet, &n->packets[i]); } // add the packet to our incoming list n->packets[index]=*packet; - n->packets[index].payload = dup_payload; n->queue_size++; + + // find the first missing seq + uint8_t seq; + for (seq = n->window_start; ;seq++){ + int index = seq & (n->max_queue_size -1); + if (n->packets[index].sequence != seq || !n->packets[index].payload) + break; + } + n->unseen = seq; + return 0; } @@ -357,28 +392,27 @@ static void _nc_rx_advance_window(struct nc_half *n, uint8_t new_window_start) } } -int nc_rx_packet(struct nc *n, uint8_t *payload, size_t len) +int nc_rx_packet(struct nc *n, const uint8_t *payload, size_t len) { - if (len!=2 && len != NC_HEADER_LEN+n->rx.datagram_size){ - fprintf(stderr, "len=%zd\n",len); - return -1; + if (len>=2){ + _nc_ack(&n->tx, payload[0], payload[1]); } - _nc_ack(&n->tx, payload[0], payload[1]); - - if (len < NC_HEADER_LEN+n->rx.datagram_size) - return 0; + if (lenrx.datagram_size struct nc_packet packet={ .sequence = new_window_start, .combination = read_uint32(&payload[3]), - .payload = &payload[NC_HEADER_LEN], + .payload = emalloc_zero(n->rx.datagram_size), + .len = n->rx.datagram_size, }; + bcopy(&payload[NC_HEADER_LEN], packet.payload, len - NC_HEADER_LEN); int r = _nc_rx_combine_packet(&n->rx, &packet); - _nc_rx_advance_window(&n->rx, new_window_start); return r; @@ -414,31 +448,12 @@ int nc_rx_next_delivered(struct nc *n, uint8_t *payload, int buffer_size) queue is full. 4. nc_tx_ack_dof() works, rejects bad input, and correctly releases buffers. 5. nc_tx_random_linear_combination() works, rejects bad input, and produces valid - linear combinations of the enqueued datagrams, and never produces all zeroes. + linear combinations of the enqueuedumd datagrams, and never produces all zeroes. 6. nc_rx_linear_combination() works, rejects bad input 7. nc_rx_linear_combination() rejects when RX queue full, when combination starts before current window. */ -static int _nc_dump_half(struct nc_half *n) -{ - fprintf(stderr, " window start; %d\n", n->window_start); - fprintf(stderr, " queue size; %d\n", n->queue_size); - fprintf(stderr, " max queue size; %d\n", n->max_queue_size); - int i; - for (i=0;imax_queue_size;i++){ - if (!n->packets[i].payload) - continue; - fprintf(stderr, " %02d: 0x%02x, 0x%08x ", - i, n->packets[i].sequence, n->packets[i].combination); - int j; - for(j=0;j<32;j++) - fprintf(stderr, "%0d",(n->packets[i].combination>>(31-j))&1); - fprintf(stderr, "\n"); - } - return 0; -} - static void _nc_dump(struct nc *n) { fprintf(stderr, "TX\n"); diff --git a/network_coding.h b/network_coding.h index d1d54fe4..b512ef09 100644 --- a/network_coding.h +++ b/network_coding.h @@ -3,4 +3,19 @@ #define NC_HEADER_LEN 7 +#define URGENCY_ASAP 0 +#define URGENCY_SOON 1 +#define URGENCY_IDLE 2 + +struct nc; + +struct nc *nc_new(uint8_t max_window_size, uint8_t datagram_size); +int nc_free(struct nc *n); +int nc_tx_has_room(struct nc *n); +int nc_tx_enqueue_datagram(struct nc *n, unsigned char *d, size_t len); +int nc_tx_produce_packet(struct nc *n, uint8_t *datagram, uint32_t buffer_size); +int nc_rx_packet(struct nc *n, const uint8_t *payload, size_t len); +int nc_rx_next_delivered(struct nc *n, uint8_t *payload, int buffer_size); +int nc_tx_packet_urgency(struct nc *n); + #endif \ No newline at end of file diff --git a/radio_link.c b/radio_link.c index 628a4bf9..620f9a65 100644 --- a/radio_link.c +++ b/radio_link.c @@ -49,6 +49,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "overlay_buffer.h" #include "golay.h" #include "radio_link.h" +#include "network_coding.h" #define MAVLINK_MSG_ID_RADIO 166 #define MAVLINK_MSG_ID_DATASTREAM 67 @@ -73,18 +74,19 @@ struct mavlink_RADIO_v10 { */ +#define PAYLOAD_FRAGMENT 0xFF #define FEC_LENGTH 32 #define FEC_MAX_BYTES 223 #define RADIO_HEADER_LENGTH 6 -#define RADIO_USED_HEADER_LENGTH 4 +#define RADIO_ACTUAL_HEADER_LENGTH 4 #define RADIO_CRC_LENGTH 2 -#define LINK_PAYLOAD_MTU (LINK_MTU - FEC_LENGTH - RADIO_HEADER_LENGTH - RADIO_CRC_LENGTH) +#define LINK_NC_MTU (LINK_MTU - FEC_LENGTH - RADIO_ACTUAL_HEADER_LENGTH) +#define LINK_PAYLOAD_MTU (LINK_NC_MTU - NC_HEADER_LEN) struct radio_link_state{ - // next seq for transmission - int tx_seq; - + struct nc *network_coding; + // small buffer for parsing incoming bytes from the serial interface, // looking for recoverable link layer packets // should be large enough to hold at least one packet from the remote end @@ -94,8 +96,6 @@ struct radio_link_state{ // decoded length of next link layer packet // including all header and footer bytes int payload_length; - // last rx seq for reassembly - int seq; // offset within payload that we have found a valid looking header int payload_start; // offset after payload_start for incoming bytes @@ -152,6 +152,7 @@ int decode_rs_8(data_t *data, int *eras_pos, int no_eras, int pad); int radio_link_free(struct overlay_interface *interface) { if (interface->radio_link_state){ + nc_free(interface->radio_link_state->network_coding); free(interface->radio_link_state); interface->radio_link_state=NULL; } @@ -161,6 +162,7 @@ int radio_link_free(struct overlay_interface *interface) int radio_link_init(struct overlay_interface *interface) { interface->radio_link_state = emalloc_zero(sizeof(struct radio_link_state)); + interface->radio_link_state->network_coding = nc_new(16, LINK_PAYLOAD_MTU); return 0; } @@ -171,47 +173,34 @@ void radio_link_state_html(struct strbuf *b, struct overlay_interface *interface strbuf_sprintf(b, "Remote RSSI: %ddB
", state->remote_rssi); } -// write a new link layer packet to interface->txbuffer -// consuming more bytes from the next interface->tx_packet if required -static int radio_link_encode_packet(struct radio_link_state *link_state) +static int encode_next_packet(struct radio_link_state *link_state) { - // if we have nothing interesting left to send, don't create a packet at all - if (!link_state->tx_packet) - return 0; + while (link_state->tx_packet && nc_tx_has_room(link_state->network_coding)){ + // queue one packet + uint8_t next_packet[LINK_PAYLOAD_MTU]; + bzero(next_packet, sizeof(next_packet)); + ob_checkpoint(link_state->tx_packet); - int count = ob_remaining(link_state->tx_packet); - int startP = (ob_position(link_state->tx_packet) == 0); - int endP = 1; - if (count > LINK_PAYLOAD_MTU){ - count = LINK_PAYLOAD_MTU; - endP = 0; - } - - link_state->txbuffer[0]=0xfe; // mavlink v1.0 magic header - - // we need to add FEC_LENGTH for FEC, but the length field doesn't include the expected headers or CRC - int len = count + FEC_LENGTH - RADIO_CRC_LENGTH; - link_state->txbuffer[1]=len; // mavlink payload length - link_state->txbuffer[2]=(len & 0xF); - link_state->txbuffer[3]=0; - - // add golay encoding so that decoding the actual length is more reliable - golay_encode(&link_state->txbuffer[1]); - - - link_state->txbuffer[4]=(link_state->tx_seq++) & 0x3f; - if (startP) link_state->txbuffer[4]|=0x40; - if (endP) link_state->txbuffer[4]|=0x80; - link_state->txbuffer[5]=MAVLINK_MSG_ID_DATASTREAM; - - ob_get_bytes(link_state->tx_packet, &link_state->txbuffer[6], count); - - encode_rs_8(&link_state->txbuffer[4], &link_state->txbuffer[6+count], FEC_MAX_BYTES - (count+2)); - link_state->tx_bytes=len + RADIO_CRC_LENGTH + RADIO_HEADER_LENGTH; - if (endP){ - ob_free(link_state->tx_packet); - link_state->tx_packet=NULL; - overlay_queue_schedule_next(gettime_ms()); + int count = ob_remaining(link_state->tx_packet); + if (count > LINK_PAYLOAD_MTU -1){ + count = LINK_PAYLOAD_MTU -1; + next_packet[0]=PAYLOAD_FRAGMENT; + }else + next_packet[0]=count; + + ob_get_bytes(link_state->tx_packet, &next_packet[1], count); + if (nc_tx_enqueue_datagram(link_state->network_coding, next_packet, LINK_PAYLOAD_MTU)==0){ + if (config.debug.radio_link) + DEBUGF("Enqueued fragment len %d", count+1); + }else{ + ob_rewind(link_state->tx_packet); + break; + } + if (!ob_remaining(link_state->tx_packet)){ + ob_free(link_state->tx_packet); + link_state->tx_packet=NULL; + overlay_queue_schedule_next(gettime_ms()); + } } return 0; } @@ -240,6 +229,41 @@ int radio_link_queue_packet(struct overlay_interface *interface, struct overlay_ return 0; } +static int send_link_packet(struct overlay_interface *interface) +{ + struct radio_link_state *link_state = interface->radio_link_state; + + int data_length = nc_tx_produce_packet(link_state->network_coding, + &link_state->txbuffer[RADIO_ACTUAL_HEADER_LENGTH], LINK_NC_MTU); + + // if we have nothing interesting to send, don't create a packet at all + if (data_length <=0) + return 0; + + link_state->txbuffer[0]=0xfe; // mavlink v1.0 magic header + + // the current firmware assumes that the whole packet contains 6 bytes of header and 2 bytes of crc + // that are not counted in the length. + int whole_packet = data_length + FEC_LENGTH + RADIO_ACTUAL_HEADER_LENGTH; + int radio_length = whole_packet - RADIO_HEADER_LENGTH - RADIO_CRC_LENGTH; + link_state->txbuffer[1]=radio_length; // mavlink payload length + link_state->txbuffer[2]=(radio_length & 0xF); + link_state->txbuffer[3]=0; + + // add golay encoding so that decoding the actual length is more reliable + golay_encode(&link_state->txbuffer[1]); + + encode_rs_8(&link_state->txbuffer[RADIO_ACTUAL_HEADER_LENGTH], + &link_state->txbuffer[RADIO_ACTUAL_HEADER_LENGTH + data_length], + FEC_MAX_BYTES - data_length); + + link_state->tx_bytes=whole_packet; + if (config.debug.radio_link) + DEBUGF("Produced packet len %d", whole_packet); + + return 0; +} + static int build_heartbeat(struct radio_link_state *link_state) { int count=9; @@ -280,6 +304,9 @@ int radio_link_tx(struct overlay_interface *interface) while(1){ + // encode more data if we have a packet waiting, and have space + encode_next_packet(link_state); + if (link_state->tx_bytes){ if (link_state->next_tx_allowed > now){ interface->alarm.alarm = link_state->next_tx_allowed; @@ -318,14 +345,25 @@ int radio_link_tx(struct overlay_interface *interface) if (link_state->remaining_space < LINK_MTU + HEARTBEAT_SIZE) link_state->next_heartbeat = now; - if (!link_state->tx_packet){ - // finished current packet, wait for more. - interface->alarm.alarm = next_tick; + int urgency = nc_tx_packet_urgency(link_state->network_coding); + time_ms_t delay = 500; + switch (urgency){ + case URGENCY_ASAP: + delay=20; + break; + case URGENCY_SOON: + delay=200; + break; + } + + if (link_state->last_packet + delay > now){ + interface->alarm.alarm = link_state->last_packet + delay; + if (interface->alarm.alarm > next_tick) + interface->alarm.alarm = next_tick; break; } - // encode another packet fragment - radio_link_encode_packet(link_state); + send_link_packet(interface); link_state->last_packet = now; } @@ -369,7 +407,7 @@ static int parse_heartbeat(struct radio_link_state *state, const unsigned char * } static int radio_link_parse(struct overlay_interface *interface, struct radio_link_state *state, - size_t packet_length, uint8_t *payload, int *backtrack) + int packet_length, unsigned char *payload, int *backtrack) { *backtrack=0; if (packet_length==17){ @@ -385,57 +423,58 @@ static int radio_link_parse(struct overlay_interface *interface, struct radio_li return 0; } - size_t data_bytes = packet_length - (RADIO_USED_HEADER_LENGTH + FEC_LENGTH); + payload += RADIO_ACTUAL_HEADER_LENGTH; + packet_length -= RADIO_ACTUAL_HEADER_LENGTH + FEC_LENGTH; - int errors=decode_rs_8(&payload[4], NULL, 0, FEC_MAX_BYTES - data_bytes); + int errors=decode_rs_8(payload, NULL, 0, FEC_MAX_BYTES - packet_length); if (errors==-1){ if (config.debug.radio_link) DEBUGF("Reed-Solomon error correction failed"); return 0; } *backtrack=errors; - data_bytes -= 2; - int seq=payload[4]&0x3f; - if (config.debug.radio_link){ - DEBUGF("Received RS protected message, len: %zd, errors: %d, seq: %d, flags:%s%s", - data_bytes, - errors, - seq, - payload[4]&0x40?" start":"", - payload[4]&0x80?" end":""); + int rx = nc_rx_packet(state->network_coding, payload, packet_length); + if (config.debug.radio_link) + DEBUGF("RX returned %d", rx); + + if (rx==0){ + // we received an interesting packet, can we deliver anything? + uint8_t fragment[LINK_PAYLOAD_MTU]; + while(1){ + int len=nc_rx_next_delivered(state->network_coding, fragment, sizeof(fragment)); + if (len<=0) + break; + int fragment_len=fragment[0]; + if (fragment_len == PAYLOAD_FRAGMENT) + fragment_len = len -1; + + // is this fragment length invalid? + if (fragment_len > len -1) + state->packet_length=sizeof(state->dst); + + // can we fit this fragment into our payload buffer? + if (fragment_len+state->packet_length < sizeof(state->dst)){ + bcopy(&fragment[1], &state->dst[state->packet_length], fragment_len); + state->packet_length+=fragment_len; + + // is this the last fragment? + if (fragment[0] != PAYLOAD_FRAGMENT){ + if (config.debug.radio_link) + DEBUGF("PDU Complete (length=%d)",state->packet_length); + + if (packetOkOverlay(interface, state->dst, state->packet_length, -1, NULL, 0)){ + dump("Invalid packet?", state->dst, state->packet_length); + } + } + } + + // reset the buffer for the next packet + if (fragment[0] != PAYLOAD_FRAGMENT) + state->packet_length=0; + } } - if (seq != ((state->seq+1)&0x3f)){ - // reject partial packet if we missed a sequence number - if (config.debug.radio_link) - DEBUGF("Rejecting packet, sequence jumped from %d to %d", state->seq, seq); - state->packet_length=sizeof(state->dst)+1; - } - - if (payload[4]&0x40){ - // start a new packet - state->packet_length=0; - } - - state->seq=payload[4]&0x3f; - if (state->packet_length + data_bytes > sizeof(state->dst)){ - if (config.debug.radio_link) - DEBUG("Fragmented packet is too long or a previous piece was missed - discarding"); - state->packet_length=sizeof(state->dst)+1; - return 1; - } - - bcopy(&payload[RADIO_HEADER_LENGTH], &state->dst[state->packet_length], data_bytes); - state->packet_length+=data_bytes; - - if (payload[4]&0x80) { - if (config.debug.radio_link) - DEBUGF("PDU Complete (length=%d)",state->packet_length); - - packetOkOverlay(interface, state->dst, state->packet_length, -1, NULL, 0); - state->packet_length=sizeof(state->dst)+1; - } return 1; } diff --git a/tests/rhizomeprotocol b/tests/rhizomeprotocol index 265dc926..683d6b1d 100755 --- a/tests/rhizomeprotocol +++ b/tests/rhizomeprotocol @@ -702,7 +702,6 @@ start_radio_instance() { set debug.rhizome_tx on \ set debug.rhizome_rx on \ set debug.throttling on \ - set debug.radio_link on \ set rhizome.advertise.interval 5000 \ set rhizome.rhizome_mdp_block_size 375 \ set log.console.level debug \ diff --git a/tests/routing b/tests/routing index 2d2381e6..710f7922 100755 --- a/tests/routing +++ b/tests/routing @@ -296,10 +296,10 @@ setup_simulate_extender() { foreach_instance +A +B start_routing_instance } test_simulate_extender() { - wait_until path_exists +A +B - wait_until path_exists +B +A + wait_until --timeout=20 path_exists +A +B + wait_until --timeout=20 path_exists +B +A set_instance +A - executeOk_servald mdp ping --timeout=3 $SIDB 1 + executeOk_servald mdp ping --interval=0.500 --timeout=10 $SIDB 20 tfw_cat --stdout --stderr } teardown_simulate_extender() {