From 746b33b53de5fba2f98678af8d333603a73a84d0 Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 11 Oct 2013 20:59:54 +1030 Subject: [PATCH] added initial work on network coding. Refactor network coding --- .gitignore | 1 + Makefile.in | 3 + network_coding.c | 739 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 743 insertions(+) create mode 100644 network_coding.c diff --git a/.gitignore b/.gitignore index 5ab0abeb..032558a3 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,7 @@ serval.c /directory_service /tfw_createfile /fakeradio +/network_coding *.so test.*.log testlog diff --git a/Makefile.in b/Makefile.in index 87fb9452..093535e2 100644 --- a/Makefile.in +++ b/Makefile.in @@ -111,6 +111,9 @@ fakeradio: fakeradio.o @echo LINK $@ @$(CC) $(CFLAGS) -Wall -o $@ fakeradio.o +network_coding: network_coding.c + @$(CC) -DRUNTESTS -Wall -o $@ network_coding.c + # This does not build on 64 bit elf platforms as NaCL isn't built with -fPIC # DOC 20120615 libservald.so: $(OBJS) version.o diff --git a/network_coding.c b/network_coding.c new file mode 100644 index 00000000..65f5b271 --- /dev/null +++ b/network_coding.c @@ -0,0 +1,739 @@ +/* +Serval DNA network coding functions +Copyright (C) 2013 Paul Gardner-Stephen + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ + +#include +#include +#include +#include +#include +#include + +struct nc_packet{ + uint32_t sequence; + uint32_t combination; + uint8_t *payload; +}; + +struct nc_half { + // Define the size parameters of the network coding data structures + uint32_t window_size; // limited to the number of bits we can fit in a uint32_t + uint32_t window_start; // sequence of first datagram in sending window + uint32_t datagram_size; // number of bytes in each fixed sized unit + uint32_t deliver_next; // sequence of next packet that should be delivered + // dynamically sized array of pointers to packet buffers + // each packet will be stored in the array based on the low order bits of the sequence number + struct nc_packet *packets; + // size of packet array, should be an exact 2^n in size to simplify storing sequence numbered packets + // At the sender, this is the maximum window size + // At the receiver, this should be 2*maximum window size to allow for older packets we can't decode yet + uint32_t max_queue_size; + uint32_t queue_size; // # of packets currently in the array +}; + +struct nc{ + struct nc_half tx; + struct nc_half rx; +}; + +void write_uint32(unsigned char *o,uint32_t v) +{ + int i; + for(i=0;i<4;i++) + { *(o++)=v&0xff; v=v>>8; } +} + +uint32_t read_uint32(unsigned char *o) +{ + int i; + uint32_t v=0; + for(i=0;i<4;i++) v=(v<<8)|o[4-1-i]; + return v; +} + +static void _nc_free_half(struct nc_half *n) +{ + int i; + for (i=0;imax_queue_size;i++){ + if (n->packets[i].payload) + free(n->packets[i].payload); + } + free(n->packets); +} + +int nc_free(struct nc *n) +{ + _nc_free_half(&n->tx); + _nc_free_half(&n->rx); + free(n); + return 0; +} + +struct nc *nc_new(uint32_t max_window_size, uint32_t datagram_size) +{ + struct nc *n = calloc(sizeof(struct nc),1); + if (!n) + return NULL; + n->tx.packets = calloc(sizeof(struct nc_packet)*max_window_size,1); + if (!n->tx.packets){ + free(n); + return NULL; + } + n->tx.max_queue_size = max_window_size; + n->rx.packets = calloc(sizeof(struct nc_packet)*max_window_size*2,1); + if (!n->rx.packets){ + free(n->tx.packets); + free(n); + return NULL; + } + n->rx.max_queue_size = max_window_size*2; + n->tx.datagram_size = datagram_size; + n->rx.datagram_size = datagram_size; + return n; +} + +int nc_test_dump(char *name, unsigned char *addr, int len); +int nc_test_dump_rx_queue(char *msg,struct nc *n); + +int nc_tx_has_room(struct nc *n) +{ + // On the TX side the maximum number of queued packets + // is the MINIMUM of the maximum queue size and the window + // size. + if (n->tx.queue_size>=n->tx.max_queue_size) + return 0; + return 1; +} + +int nc_tx_enqueue_datagram(struct nc *n, unsigned char *d, int len) +{ + if (len!=n->tx.datagram_size){ + fprintf(stderr, "Invalid length %d (%d)\n", len, n->tx.datagram_size); + return -1; + } + if (!nc_tx_has_room(n)){ + fprintf(stderr, "No room (%d, %d)\n", n->tx.queue_size, n->tx.max_queue_size); + return -1; + } + + // Add datagram to queue + int seq = n->tx.window_start + n->tx.queue_size; + int index = seq & (n->tx.max_queue_size -1); + if (n->tx.packets[index].payload){ + fprintf(stderr, "Attempted to replace TX payload %d (%d) without freeing it first\n",index,n->tx.packets[index].sequence); + exit(-1); + } + n->tx.packets[index].payload = malloc(len); + n->tx.packets[index].sequence = seq; + n->tx.packets[index].combination = 0x80000000; + bcopy(d, n->tx.packets[index].payload, len); + n->tx.queue_size++; + return 0; +} + +static int _nc_get_ack(struct nc_half *n, uint32_t *first_unseen) +{ + uint32_t seq; + + for (seq = n->window_start; ;seq++){ + int index = seq & (n->max_queue_size -1); + if (n->packets[index].sequence != seq || !n->packets[index].payload) + break; + } + if (seq > n->deliver_next+16){ + seq = n->deliver_next+16; + } + *first_unseen = seq; + return 0; +} + +static int _nc_ack(struct nc_half *n, uint32_t first_unseen) +{ + // ignore invalid input or no new information + if (first_unseen <= n->window_start || + first_unseen > n->window_start + n->queue_size) + return -1; + + // release any seen packets + while(n->window_start < first_unseen){ + int index = n->window_start & (n->max_queue_size -1); + if (n->packets[index].payload){ + free(n->packets[index].payload); + n->packets[index].payload=NULL; + n->queue_size--; + } + n->window_start++; + } + return 0; +} + +static uint32_t _combine_masks(const struct nc_packet *src, const struct nc_packet *dst) +{ + int offset = src->sequence - dst->sequence; + uint32_t mask = src->combination >> offset; + if ((mask << offset) != src->combination){ +// fprintf(stderr, "Invalid mask combination (%d, %d, %d, %08x, %08x, %08x)\n", +// src->sequence, dst->sequence, offset, src->combination, mask, (mask << offset)); + return 0xFFFFFFFF; + } + return dst->combination ^ mask; +} + +static void _combine_packets(const struct nc_packet *src, struct nc_packet *dst, uint32_t datagram_size) +{ + // TODO verify that this combination mask is set correctly. + dst->combination = _combine_masks(src, dst); + int i; + for(i=0;ipayload[i]^=src->payload[i]; +} + +static int _nc_tx_combine_random_payloads(struct nc_half *n, struct nc_packet *packet){ + // TODO: Check that combination is linearly independent of recently produced + // combinations, i.e., that it contributes information. + + // get 32 bit random number. random() only returns 31 bits, + // hence the double call and shift + + uint32_t combination; + combination=random()^(random()<<1); + + // restrict set bits to only those in the window + // i.e., zero lower (32-n->window_used) bits + combination=(combination>>(32-n->queue_size))<<(32-n->queue_size); + // Never send all zeros, since that conveys no information. + + bzero(packet->payload, n->datagram_size); + + int i; + for(i=0;imax_queue_size;i++) { + int index = (n->window_start + i) & (n->max_queue_size -1); + // assume we might have gaps in the payload list if we are a retransmitter in the network path + if (!n->packets[index].payload) + continue; + // always send the first packet in the window + if ((combination&0x80000000)||n->packets[index].sequence==packet->sequence) + _combine_packets(&n->packets[index], packet, n->datagram_size); + combination<<=1; + } + + return 0; +} + +// construct a packet and return the payload size +int nc_tx_produce_packet(struct nc *n, uint8_t *datagram, uint32_t buffer_size) +{ + // TODO: Don't waste more bytes than we need to on the bitmap and sequence number + if (buffer_size < n->tx.datagram_size+12) + return -1; + + uint32_t unseen; + if (_nc_get_ack(&n->rx, &unseen)) + return -1; + + write_uint32(&datagram[0], unseen); + + if (!n->tx.queue_size){ + // No data to send, just send an ack + // TODO don't ack too often + return 4; + } + + // Produce linear combination + struct nc_packet packet={ + .sequence = n->tx.window_start, + .combination = 0, + .payload = &datagram[12], + }; + + if (_nc_tx_combine_random_payloads(&n->tx, &packet)) + return -1; + + // TODO assert actual_combination? (should never be zero) + // Write out bitmap of actual combinations involved + write_uint32(&datagram[4], packet.sequence); + write_uint32(&datagram[8], packet.combination); + return 12+n->tx.datagram_size; +} + +static int _nc_rx_combine_packet(struct nc_half *n, struct nc_packet *packet) +{ + int i; + + // First, reduce the combinations of the incoming packet based on other packets already seen + for (i=0;imax_queue_size;i++){ + if (!n->packets[i].payload || n->packets[i].sequence < packet->sequence) + continue; +// printf("Reducing incoming packet (%d, %08x) w. existing (%d, %08x)\n", +// packet->sequence, packet->combination, +// n->packets[i].sequence, n->packets[i].combination); + uint32_t new_mask = _combine_masks(&n->packets[i], packet); + + // rx packet doesn't add any new information + if (new_mask==0){ + return 1; + } + + if (new_mask < packet->combination){ + _combine_packets(&n->packets[i], packet, n->datagram_size); + } + } + + // the new packet must contain new information that will cause a new packet to be seen. + int shift = __builtin_clz(packet->combination); + packet->sequence += shift; + packet->combination <<= shift; + + int index = packet->sequence & (n->max_queue_size -1); + if (n->packets[index].payload){ + fprintf(stderr, "Attempted to replace RX payload %d (%d) with %d without freeing it first\n",index,n->packets[index].sequence, packet->sequence); + exit(-1); + } + + // try to duplicate the payload first, we don't want to reduce existing packets if this fails. + unsigned char *dup_payload = malloc(n->datagram_size); + if (!dup_payload) + return -1; + bcopy(packet->payload, dup_payload, n->datagram_size); + // reduce other stored packets + for (i=0;imax_queue_size;i++){ + if (!n->packets[i].payload || n->packets[i].sequence > packet->sequence) + continue; +// printf("Reducing existing packet (%d, %08x) w. incoming (%d, %08x)\n", +// n->packets[i].sequence, n->packets[i].combination, +// packet->sequence, packet->combination); + uint32_t new_mask = _combine_masks(packet, &n->packets[i]); + if (new_mask < n->packets[i].combination){ + _combine_packets(packet, &n->packets[i], n->datagram_size); + } + } + + // add the packet to our incoming list + n->packets[index]=*packet; + n->packets[index].payload = dup_payload; + n->queue_size++; + return 0; +} + +static void _nc_rx_advance_window(struct nc_half *n, uint32_t new_window_start) +{ + // advance the window start to match the sender + // drop any payloads that have already been delivered + while(n->window_start < new_window_start){ + if (n->window_start < n->deliver_next){ + int index = n->window_start & (n->max_queue_size -1); + if (n->packets[index].payload){ + free(n->packets[index].payload); + n->packets[index].payload=NULL; + n->queue_size--; + } + } + n->window_start++; + } +} + +int nc_rx_packet(struct nc *n, uint8_t *payload, int len) +{ + if (len!=4 && len != 12+n->rx.datagram_size) + return -1; + + uint32_t unseen = read_uint32(payload); + + _nc_ack(&n->tx, unseen); + + if (len < 12+n->rx.datagram_size){ + return 0; + } + + uint32_t new_window_start = read_uint32(&payload[4]); + struct nc_packet packet={ + .sequence = new_window_start, + .combination = read_uint32(&payload[8]), + .payload = &payload[12], + }; + + int r = _nc_rx_combine_packet(&n->rx, &packet); + + _nc_rx_advance_window(&n->rx, new_window_start); + + return r; +} + +// After each nc_rx_packet, call this function repeatedly to retrieve all decoded payloads in order +int nc_rx_next_delivered(struct nc *n, uint8_t *payload, int buffer_size) +{ + if (buffer_size < n->rx.datagram_size) + return -1; + + int index = n->rx.deliver_next & (n->rx.max_queue_size -1); + if (!n->rx.packets[index].payload || + n->rx.packets[index].combination != 0x80000000) + return 0; + + bcopy(n->rx.packets[index].payload, payload, n->rx.datagram_size); + n->rx.deliver_next++; + // drop the payload if the sender has already advanced + if (n->rx.deliver_next<=n->rx.window_start){ + free(n->rx.packets[index].payload); + n->rx.packets[index].payload=NULL; + n->rx.queue_size--; + } + return n->rx.datagram_size; +} + +static int _nc_dump_half(struct nc_half *n) +{ + fprintf(stderr, " window start; %d\n", n->window_start); + fprintf(stderr, " queue size; %d\n", n->queue_size); + fprintf(stderr, " max queue size; %d\n", n->max_queue_size); + int i; + for (i=0;imax_queue_size;i++){ + if (!n->packets[i].payload) + continue; + fprintf(stderr, " %02d: 0x%08x, 0x%08x ", + i, n->packets[i].sequence, n->packets[i].combination); + int j; + for(j=0;j<32;j++) + fprintf(stderr, "%0d",(n->packets[i].combination>>(31-j))&1); + fprintf(stderr, " "); + for(j=0;j<16;j++) + fprintf(stderr, "%02x",n->packets[i].payload[j]); + fprintf(stderr, "\n"); + } + return 0; +} + +static void _nc_dump(struct nc *n) +{ + fprintf(stderr, "TX\n"); + _nc_dump_half(&n->tx); + fprintf(stderr, "RX\n"); + fprintf(stderr, " delivered; %d\n", n->rx.deliver_next); + _nc_dump_half(&n->rx); +} + +#ifdef RUNTESTS +/* TODO: Tests that should be written. + 1. nc_new() works, and rejects bad input. + 2. nc_free() works, including on partially initialised structures. + 3. nc_tx_enqueue_datagram() works, including failing on bad input and when the + queue is full. + 4. nc_tx_ack_dof() works, rejects bad input, and correctly releases buffers. + 5. nc_tx_random_linear_combination() works, rejects bad input, and produces valid + linear combinations of the enqueued datagrams, and never produces all zeroes. + 6. nc_rx_linear_combination() works, rejects bad input + 7. nc_rx_linear_combination() rejects when RX queue full, when combination starts + before current window. +*/ + +int nc_test_random_datagram(uint8_t *d,int len) +{ + int i; + for(i=0;itx.queue_size!=1){ + fprintf(stderr,"FAIL: Enqueueing datagram increases queue_size\n"); + return -1; + } + fprintf(stderr,"PASS: Enqueueing datagram increases queue_size\n"); + + int j=0; + for(i=0;i<10;i++) { + uint8_t outbuffer[12+200]; + int len=sizeof(outbuffer); + int written = nc_tx_produce_packet(tx, outbuffer, len); + if (written==-1){ + fprintf(stderr,"FAIL: Produce random linear combination of single packet for TX\n"); + return -1; + } + if (i==9) + fprintf(stderr,"PASS: Produce random linear combination of single packet for TX\n"); + uint32_t combination = read_uint32(&outbuffer[8]); + if (!combination) { + fprintf(stderr,"FAIL: Should not produce empty linear combination bitmap\n"); + return -1; + } + if (i==9) + fprintf(stderr,"PASS: Should not produce empty linear combination bitmap\n"); + + if (memcmp(&outbuffer[12], datagrams[0], 200)!=0){ + fprintf(stderr,"FAIL: Output identity datagram when only one in queue\n"); + return -1; + } + if (i==9) + fprintf(stderr,"PASS: Output identity datagram when only one in queue\n"); + } + + // can we combine seq & combination as expected? + + struct nc_packet headers[]={ + { + .sequence = 0, + .combination = 0xF0000000 + }, + { + .sequence = 0, + .combination = 0x80000000 + }, + { + .sequence = 1, + .combination = 0xE0000000 + }, + { + .sequence = 2, + .combination = 0xC0000000 + }, + { + .sequence = 3, + .combination = 0x80000000 + }, + }; + + for (i=0;i<5;i++){ + for (j=i;j<5;j++){ + _combine_masks(&headers[j], &headers[i]); + // TODO assert... + } + } + + // now can we receive this first packet? + { + uint8_t outbuffer[12+200]; + int written = nc_tx_produce_packet(tx, outbuffer, sizeof(outbuffer)); + if (written==-1){ + fprintf(stderr, "Failed to produce packet\n"); + exit(-1); + } + int r=nc_rx_packet(rx, outbuffer, written); + if (r==-1){ + fprintf(stderr, "Failed to receive packet\n"); + exit(-1); + } + } + + // can we decode it? + + { + uint8_t outbuffer[200]; + int size = nc_rx_next_delivered(rx, outbuffer, sizeof(outbuffer)); + if (size!=200){ + fprintf(stderr,"FAIL: Expected to receive 200 bytes, got %d\n", size); + exit(-1); + } + if (memcmp(outbuffer, datagrams[0], 200) != 0){ + fprintf(stderr,"FAIL: Output of first packet should match incoming packet\n"); + return -1; + } + } + + // acknowledging this first packet advances the window + { + uint8_t outbuffer[12+200]; + int written = nc_tx_produce_packet(rx, outbuffer, sizeof(outbuffer)); + if (written==-1){ + fprintf(stderr, "FAIL: Producing ACK\n"); + exit(-1); + } + int r=nc_rx_packet(tx, outbuffer, written); + if (r==-1){ + fprintf(stderr,"FAIL: processing ACK\n"); + return -1; + } + if (tx->tx.window_start!=1){ + fprintf(stderr,"FAIL: acknowledgement advances window_start (is %d, should be 1)\n",tx->tx.window_start); + return -1; + } + fprintf(stderr,"PASS: acknowledgement advances window_start\n"); + if (tx->tx.queue_size!=0){ + fprintf(stderr,"FAIL: acknowledgement causes packets to be discarded (is %d, should be 0)\n",tx->tx.queue_size); + return -1; + } + fprintf(stderr,"PASS: acknowledgement causes packets to be discarded\n"); + } + + for (i=1;i<8;i++){ + if (nc_tx_enqueue_datagram(tx,datagrams[i],200)) { + fprintf(stderr,"FAIL: Failed to enqueue datagram %d for TX\n", i); + return -1; + } + if (tx->tx.queue_size!=i){ + fprintf(stderr,"FAIL: Enqueueing datagram increases queue_size\n"); + return -1; + } + } + + int decoded=0; + for(i=0;i<100;i++) { + uint8_t outbuffer[12+200]; + int len=sizeof(outbuffer); + int written = nc_tx_produce_packet(tx, outbuffer, len); + if (written==-1){ + fprintf(stderr,"FAIL: Produce random linear combination of multiple packets for TX\n"); + return -1; + } + if (i==0) + fprintf(stderr,"PASS: Produce random linear combination of multiple packets for TX\n"); + uint32_t combination = read_uint32(&outbuffer[8]); + if (!combination) { + fprintf(stderr,"FAIL: Should not produce empty linear combination bitmap\n"); + return -1; + } + if (i==0) + fprintf(stderr,"PASS: Should not produce empty linear combination bitmap\n"); + + for(j=0;j<200;j++) { + int k; + uint8_t x = outbuffer[12+j]; + for (k=0;k<8;k++){ + if (combination&(0x80000000>>k)) + x^=datagrams[k+1][j]; + } + if (x){ + fprintf(stderr,"FAIL: Output linear combination from multiple packets in the queue\n"); + return -1; + } + } + if (i==0) + fprintf(stderr,"PASS: Output linear combination from multiple packets in the queue\n"); + + nc_rx_packet(rx, outbuffer, written); + while(1){ + uint8_t out[200]; + int size = nc_rx_next_delivered(rx, out, sizeof(out)); + if (size!=200) + break; + decoded++; + if (memcmp(out, datagrams[decoded], 200) != 0){ + fprintf(stderr,"FAIL: Output of %d packet should match incoming packet (@%d, %02x != %02x)\n", decoded, j, out[j], datagrams[decoded][j]); + return -1; + } + if (decoded==7) + fprintf(stderr,"PASS: First 8 packets delivered\n"); + } + } + + if (decoded!=7){ + fprintf(stderr,"FAIL: First 8 packets should have been delivered by now\n"); + return -1; + } + + // acknowledging first 8 packets advances the tx window + { + uint8_t outbuffer[12+200]; + int written = nc_tx_produce_packet(rx, outbuffer, sizeof(outbuffer)); + if (written==-1){ + fprintf(stderr, "FAIL: Producing ACK\n"); + exit(-1); + } + int r=nc_rx_packet(tx, outbuffer, written); + if (r==-1){ + fprintf(stderr,"FAIL: processing ACK\n"); + return -1; + } + if (tx->tx.window_start!=8){ + fprintf(stderr,"FAIL: acknowledgement advances window_start (is %d, should be 8)\n",tx->tx.window_start); + return -1; + } + fprintf(stderr,"PASS: acknowledgement advances window_start\n"); + if (tx->tx.queue_size!=0){ + fprintf(stderr,"FAIL: acknowledgement causes packets to be discarded (is %d, should be 0)\n",tx->tx.queue_size); + return -1; + } + fprintf(stderr,"PASS: acknowledgement causes packets to be discarded\n"); + } + + int sent =0; + while(rx->rx.deliver_next < 10000){ + // fill the transmit window whenever there is space + while(tx->tx.queue_sizetx.max_queue_size){ + if (nc_tx_enqueue_datagram(tx,datagrams[(tx->tx.window_start+tx->tx.queue_size+1)%7],200)){ + fprintf(stderr,"FAIL: Failed to dispatch pre-fill datagram\n"); + return -1; + } + } + + // generate a packet in each direction + uint8_t one[12+200]; + uint8_t two[12+200]; + sent++; + int wone = nc_tx_produce_packet(tx, one, sizeof(one)); + int wtwo = nc_tx_produce_packet(rx, two, sizeof(two)); + + // receive each packet + int rone=nc_rx_packet(rx, one, wone); + nc_rx_packet(tx, two, wtwo); + + if (rone!=1){ + // deliver anything that can be decoded + while(1){ + uint8_t out[200]; + int size = nc_rx_next_delivered(rx, out, sizeof(out)); + if (size!=200) + break; + decoded++; + } + } + } + fprintf(stderr, "PASS: Delivered 10000 packets after sending %d\n", sent); + + nc_free(tx); + nc_free(rx); + fprintf(stderr,"PASS: Release memory\n"); + + return 0; +} + +int main(int argc,char **argv) +{ + return nc_test(); +} + +#endif