diff --git a/network_coding.c b/network_coding.c index ca3abf04..a824b0e9 100644 --- a/network_coding.c +++ b/network_coding.c @@ -25,9 +25,13 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include #include +#define FLAG_NEW 1 +#define HEADER_LEN 16 + struct nc_packet{ uint32_t sequence; uint32_t combination; + uint8_t flags; uint8_t *payload; }; @@ -96,6 +100,7 @@ struct nc *nc_new(uint32_t max_window_size, uint32_t datagram_size) return NULL; } n->tx.max_queue_size = max_window_size; + n->tx.window_size = 4; n->rx.packets = calloc(sizeof(struct nc_packet)*max_window_size*2,1); if (!n->rx.packets){ free(n->tx.packets); @@ -142,12 +147,13 @@ int nc_tx_enqueue_datagram(struct nc *n, unsigned char *d, int len) n->tx.packets[index].payload = malloc(len); n->tx.packets[index].sequence = seq; n->tx.packets[index].combination = 0x80000000; + n->tx.packets[index].flags = FLAG_NEW; bcopy(d, n->tx.packets[index].payload, len); n->tx.queue_size++; return 0; } -static int _nc_get_ack(struct nc_half *n, uint32_t *first_unseen) +static int _nc_get_ack(struct nc_half *n, uint32_t *first_unseen, uint32_t *window_size) { uint32_t seq; @@ -156,15 +162,21 @@ static int _nc_get_ack(struct nc_half *n, uint32_t *first_unseen) if (n->packets[index].sequence != seq || !n->packets[index].payload) break; } - if (seq > n->deliver_next+16){ - seq = n->deliver_next+16; - } + int blocked = seq - (int)n->deliver_next; + if (blocked>=n->max_queue_size) + *window_size = 1; + else + *window_size = n->max_queue_size - blocked; *first_unseen = seq; return 0; } -static int _nc_ack(struct nc_half *n, uint32_t first_unseen) +static int _nc_ack(struct nc_half *n, uint32_t first_unseen, uint32_t window_size) { + if (window_size>n->max_queue_size) + window_size=n->max_queue_size; + n->window_size = window_size; + // ignore invalid input or no new information if (first_unseen <= n->window_start || first_unseen > n->window_start + n->queue_size) @@ -208,29 +220,36 @@ static int _nc_tx_combine_random_payloads(struct nc_half *n, struct nc_packet *p // TODO: Check that combination is linearly independent of recently produced // combinations, i.e., that it contributes information. - // get 32 bit random number. random() only returns 31 bits, - // hence the double call and shift - - uint32_t combination; - combination=random()^(random()<<1); - - // restrict set bits to only those in the window - // i.e., zero lower (32-n->window_used) bits - combination=(combination>>(32-n->queue_size))<<(32-n->queue_size); - // Never send all zeros, since that conveys no information. - - bzero(packet->payload, n->datagram_size); - + // (always send the first packet in the window) + uint32_t combination=1; int i; - for(i=0;imax_queue_size;i++) { + // give every other packet about 1/4 chance of being included + for (i=0;i<8;i++) + combination|=(1<<(random()&31)); + + bzero(packet->payload, n->datagram_size); + int added_new=0; + + for(i=0;iwindow_size;i++) { int index = (n->window_start + i) & (n->max_queue_size -1); // assume we might have gaps in the payload list if we are a retransmitter in the network path if (!n->packets[index].payload) continue; - // always send the first packet in the window - if ((combination&0x80000000)||n->packets[index].sequence==packet->sequence) + + // there's no point including more than one new packet + // only if we have some packet loss will we need to send more combinations of packets + if (n->packets[index].flags & FLAG_NEW){ + if (added_new) + continue; _combine_packets(&n->packets[index], packet, n->datagram_size); - combination<<=1; + added_new = 1; + n->packets[index].flags =0; + continue; + } + + if (combination&1) + _combine_packets(&n->packets[index], packet, n->datagram_size); + combination>>=1; } return 0; @@ -240,26 +259,27 @@ static int _nc_tx_combine_random_payloads(struct nc_half *n, struct nc_packet *p int nc_tx_produce_packet(struct nc *n, uint8_t *datagram, uint32_t buffer_size) { // TODO: Don't waste more bytes than we need to on the bitmap and sequence number - if (buffer_size < n->tx.datagram_size+12) + if (buffer_size < n->tx.datagram_size+HEADER_LEN) return -1; - uint32_t unseen; - if (_nc_get_ack(&n->rx, &unseen)) + uint32_t unseen, window_size; + if (_nc_get_ack(&n->rx, &unseen, &window_size)) return -1; write_uint32(&datagram[0], unseen); + write_uint32(&datagram[4], window_size); if (!n->tx.queue_size){ // No data to send, just send an ack // TODO don't ack too often - return 4; + return 8; } // Produce linear combination struct nc_packet packet={ .sequence = n->tx.window_start, .combination = 0, - .payload = &datagram[12], + .payload = &datagram[HEADER_LEN], }; if (_nc_tx_combine_random_payloads(&n->tx, &packet)) @@ -267,9 +287,9 @@ int nc_tx_produce_packet(struct nc *n, uint8_t *datagram, uint32_t buffer_size) // TODO assert actual_combination? (should never be zero) // Write out bitmap of actual combinations involved - write_uint32(&datagram[4], packet.sequence); - write_uint32(&datagram[8], packet.combination); - return 12+n->tx.datagram_size; + write_uint32(&datagram[8], packet.sequence); + write_uint32(&datagram[12], packet.combination); + return HEADER_LEN+n->tx.datagram_size; } static int _nc_rx_combine_packet(struct nc_half *n, struct nc_packet *packet) @@ -350,22 +370,23 @@ static void _nc_rx_advance_window(struct nc_half *n, uint32_t new_window_start) int nc_rx_packet(struct nc *n, uint8_t *payload, int len) { - if (len!=4 && len != 12+n->rx.datagram_size) + if (len!=8 && len != HEADER_LEN+n->rx.datagram_size) return -1; uint32_t unseen = read_uint32(payload); + uint32_t window_size = read_uint32(&payload[4]); - _nc_ack(&n->tx, unseen); + _nc_ack(&n->tx, unseen, window_size); - if (len < 12+n->rx.datagram_size){ + if (len < HEADER_LEN+n->rx.datagram_size){ return 0; } - uint32_t new_window_start = read_uint32(&payload[4]); + uint32_t new_window_start = read_uint32(&payload[8]); struct nc_packet packet={ .sequence = new_window_start, - .combination = read_uint32(&payload[8]), - .payload = &payload[12], + .combination = read_uint32(&payload[12]), + .payload = &payload[HEADER_LEN], }; int r = _nc_rx_combine_packet(&n->rx, &packet); @@ -490,6 +511,7 @@ int nc_test() // Prepare some random datagrams for subsequent tests int i; + int sent =0; uint8_t datagrams[8][200]; for (i=0;i<8;i++) nc_test_random_datagram(datagrams[i],200); @@ -501,20 +523,20 @@ int nc_test() int j=0; for(i=0;i<10;i++) { - uint8_t outbuffer[12+200]; + uint8_t outbuffer[HEADER_LEN+200]; int len=sizeof(outbuffer); int written = nc_tx_produce_packet(tx, outbuffer, len); if (written==-1) FAIL("Produce random linear combination of single packet for TX"); if (i==9) PASS("Produce random linear combination of single packet for TX"); - uint32_t combination = read_uint32(&outbuffer[8]); + uint32_t combination = read_uint32(&outbuffer[12]); if (!combination) FAIL("Should not produce empty linear combination bitmap"); if (i==9) PASS("Should not produce empty linear combination bitmap"); - if (memcmp(&outbuffer[12], datagrams[0], 200)!=0) + if (memcmp(&outbuffer[HEADER_LEN], datagrams[0], 200)!=0) FAIL("Output identity datagram when only one in queue"); if (i==9) PASS("Output identity datagram when only one in queue"); @@ -554,12 +576,13 @@ int nc_test() // now can we receive this first packet? { - uint8_t outbuffer[12+200]; + uint8_t outbuffer[HEADER_LEN+200]; int written = nc_tx_produce_packet(tx, outbuffer, sizeof(outbuffer)); ASSERT(written!=-1, "Produce packet"); int r=nc_rx_packet(rx, outbuffer, written); ASSERT(r!=-1, "Receive packet"); - } + sent ++; + } // can we decode it? @@ -572,7 +595,7 @@ int nc_test() // acknowledging this first packet advances the window { - uint8_t outbuffer[12+200]; + uint8_t outbuffer[HEADER_LEN+200]; int written = nc_tx_produce_packet(rx, outbuffer, sizeof(outbuffer)); ASSERT(written!=-1, "Produce ACK"); int r=nc_rx_packet(tx, outbuffer, written); @@ -590,14 +613,14 @@ int nc_test() int decoded=0; for(i=0;i<100;i++) { - uint8_t outbuffer[12+200]; + uint8_t outbuffer[HEADER_LEN+200]; int len=sizeof(outbuffer); int written = nc_tx_produce_packet(tx, outbuffer, len); if (written==-1) FAIL("Produce random linear combination of multiple packets for TX"); if (i==0) PASS("Produce random linear combination of multiple packets for TX"); - uint32_t combination = read_uint32(&outbuffer[8]); + uint32_t combination = read_uint32(&outbuffer[12]); if (!combination) FAIL("Should not produce empty linear combination bitmap"); if (i==0) @@ -605,7 +628,7 @@ int nc_test() for(j=0;j<200;j++) { int k; - uint8_t x = outbuffer[12+j]; + uint8_t x = outbuffer[HEADER_LEN+j]; for (k=0;k<8;k++){ if (combination&(0x80000000>>k)) x^=datagrams[k+1][j]; @@ -616,7 +639,9 @@ int nc_test() if (i==0) PASS("Output linear combination from multiple packets in the queue"); - nc_rx_packet(rx, outbuffer, written); + if (nc_rx_packet(rx, outbuffer, written)!=1) + sent ++; + while(1){ uint8_t out[200]; int size = nc_rx_next_delivered(rx, out, sizeof(out)); @@ -635,7 +660,7 @@ int nc_test() // acknowledging first 8 packets advances the tx window { - uint8_t outbuffer[12+200]; + uint8_t outbuffer[HEADER_LEN+200]; int written = nc_tx_produce_packet(rx, outbuffer, sizeof(outbuffer)); ASSERT(written!=-1, "Produce ACK"); int r=nc_rx_packet(tx, outbuffer, written); @@ -644,7 +669,11 @@ int nc_test() ASSERT(tx->tx.queue_size==0, "ACK causes packets to be discarded"); } - int sent =0; + int histogram[64]; + int uninteresting=0; + int dropped=0; + bzero(histogram, sizeof(histogram)); + while(rx->rx.deliver_next < 10000){ // fill the transmit window whenever there is space while(tx->tx.queue_sizetx.max_queue_size){ @@ -653,29 +682,41 @@ int nc_test() } // generate a packet in each direction - uint8_t one[12+200]; - uint8_t two[12+200]; + uint8_t one[HEADER_LEN+200]; + uint8_t two[HEADER_LEN+200]; sent++; int wone = nc_tx_produce_packet(tx, one, sizeof(one)); int wtwo = nc_tx_produce_packet(rx, two, sizeof(two)); // receive each packet - int rone=nc_rx_packet(rx, one, wone); - nc_rx_packet(tx, two, wtwo); - - if (rone!=1){ - // deliver anything that can be decoded - while(1){ - uint8_t out[200]; - int size = nc_rx_next_delivered(rx, out, sizeof(out)); - if (size!=200) - break; - decoded++; - } - } + if (!(random()&7)) + nc_rx_packet(tx, two, wtwo); + if (!(random()&7)){ + if (nc_rx_packet(rx, one, wone)!=1){ + int burst_len=0; + // deliver anything that can be decoded + while(1){ + uint8_t out[200]; + int size = nc_rx_next_delivered(rx, out, sizeof(out)); + if (size!=200) + break; + decoded++; + burst_len++; + } + histogram[burst_len]++; + }else + uninteresting++; + }else + dropped++; } PASS("Delivered 10000 packets after sending %d", sent); + fprintf(stderr, "Received = %d\n", sent - dropped); + fprintf(stderr, "Unint = %d\n", uninteresting); + fprintf(stderr, "Delivery burst histogram;\n"); + for (i=0;i<64;i++) + if (histogram[i]) + fprintf(stderr, "%d = %d (%d)\n", i, histogram[i], i*histogram[i]); nc_free(tx); nc_free(rx); PASS("Release memory");