From e29564bc6cb2eb78274efa2a8efbbf8cf7b6003a Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Wed, 11 Sep 2013 15:03:43 +0930 Subject: [PATCH] Refactor fakeradio - completely event driven and non-blocking - modelling of tx & rx serial buffers that will truncate on overfilling - each radio takes turns to send a packet, with approx TDMA overhead --- fakeradio.c | 397 +++++++++++++++++++++++++++----------------------- tests/routing | 2 +- 2 files changed, 219 insertions(+), 180 deletions(-) diff --git a/fakeradio.c b/fakeradio.c index d740a897..f39afd88 100644 --- a/fakeradio.c +++ b/fakeradio.c @@ -11,20 +11,23 @@ #include #include +int radio_packet_size=256; int chars_per_ms=1; -int ber=0; +long ber=0; struct radio_state { + int fd; int state; + const char *name; char commandbuffer[128]; int cb_len; unsigned char txbuffer[1024]; int txb_len; + unsigned char rxbuffer[1024]; + int rxb_len; long long last_char_ms; - long long last_tx_ms; - long long last_rssi_time_ms; + long long next_rssi_time_ms; int rssi_output; - int tx_rate; }; #define STATE_ONLINE 0 @@ -42,235 +45,271 @@ long long gettime_ms() return nowtv.tv_sec * 1000LL + nowtv.tv_usec / 1000; } -int emitChar(int fd,unsigned char c) -{ - // Introduce bit errors as required - int i; - - for(i=0;i<8;i++) { - if (random()0) off+=w; - break; - } - return 0; + if (len==-1) + len = strlen(bytes); + if (len + s->rxb_len > sizeof(s->rxbuffer)) + return -1; + bcopy(bytes, &s->rxbuffer[s->rxb_len], len); + s->rxb_len+=len; + return len; } -int processCommand(int fd,struct radio_state *s,int out_fd) +int processCommand(struct radio_state *s) { if (!s->cb_len) return 0; s->commandbuffer[s->cb_len]=0; char *cmd=s->commandbuffer; + + log_time(); + fprintf(stderr, "Processing command from %s \"%s\"\n", s->name, cmd); + if (!strcasecmp(cmd,"ATO")) { - emit(fd,"OK\r"); + append_bytes(s, "OK\r", -1); s->state=STATE_ONLINE; return 0; } if (!strcasecmp(cmd,"AT&T")) { - emit(fd,"OK\r"); + append_bytes(s, "OK\r", -1); s->rssi_output=0; return 0; } if (!strcasecmp(cmd,"AT&T=RSSI")) { - emit(fd,"OK\r"); + append_bytes(s, "OK\r", -1); s->rssi_output=1; return 0; } if (!strcasecmp(cmd,"ATI")) { - emit(fd,"RFD900a SIMULATOR 1.6\r"); - emit(fd,"OK\r"); + append_bytes(s, "RFD900a SIMULATOR 1.6\rOK\r", -1); return 0; } - emit(fd,"ERROR\r"); + append_bytes(s, "ERROR\r", -1); return 1; } -int dump(char *name,unsigned char *addr,int len) +int dump(char *name, unsigned char *addr, int len) { int i,j; - fprintf(stderr,"Dump of %s\n",name); - for(i=0;i=' '&&addr[i+j]<0x7f?addr[i+j]:'.'); - fprintf(stderr,"\n"); - } + if (name) + fprintf(stderr,"Dump of %s\n",name); + for(i=0;i=' '&&addr[i+j]<0x7f?addr[i+j]:'.'); + fprintf(stderr,"\n"); + } return 0; } -int print_report=0; - -int updateState(int fd,struct radio_state *s,int out_fd) +int read_bytes(struct radio_state *s) { + unsigned char buff[256]; int i; - - print_report=0; - - // Read bytes from stdin - int bytes=read(fd,&s->txbuffer[s->txb_len],sizeof(s->txbuffer)-s->txb_len); - if (bytes>0) { s->txb_len+=bytes; print_report=1; } - - // Switch to command mode if required - if (bytes<1&&s->state==STATE_PLUSPLUSPLUS&& - (gettime_ms()-s->last_char_ms)>=1000) { - s->state=STATE_COMMAND; - print_report=1; - emit(fd,"OK\r\n"); - } else - if (bytes>0) - s->last_char_ms=gettime_ms(); - - if (bytes>0) { - fprintf(stderr,"#%d Received %d bytes: ",fd,bytes); - dump("received bytes",&s->txbuffer[s->txb_len-bytes],bytes); + int bytes=read(s->fd,buff,sizeof(buff)); + if (bytes<=0) + return bytes; + log_time(); + fprintf(stderr, "Read from %s\n", s->name); + dump(NULL,buff,bytes); + s->last_char_ms = gettime_ms(); + + // process incoming bytes + for (i=0;istate==STATE_COMMAND){ + if (buff[i]=='\r'||buff[i]=='\n'){ + // and process the commend on EOL + processCommand(s); + s->cb_len=0; + }else if (s->cb_len<127) + s->commandbuffer[s->cb_len++]=buff[i]; + continue; + } + + // or watch for "+++" + if (buff[i]=='+'){ + // consume 3 +'s + if (s->state < STATE_PLUSPLUSPLUS){ + s->state++; + }else if(s->txb_lentxbuffer)){ + s->txbuffer[s->txb_len++]=buff[i]; + } + continue; + } + + // regenerate any +'s we consumed + while(s->state > STATE_ONLINE){ + if(s->txb_lentxbuffer)) + s->txbuffer[s->txb_len++]='+'; + s->state--; + } + + // or append to the transmit buffer if there's room + if(s->txb_lentxbuffer)) + s->txbuffer[s->txb_len++]=buff[i]; } + return bytes; +} - // work out how many bytes we can dispatch - long long tx_count_allowed=(gettime_ms()-s->last_tx_ms)*chars_per_ms; +int write_bytes(struct radio_state *s) +{ + int wrote = write(s->fd, s->rxbuffer, s->rxb_len); + if (wrote>0){ + log_time(); + fprintf(stderr, "Wrote to %s\n", s->name); + dump(NULL, s->rxbuffer, wrote); + if (wrote < s->rxb_len) + bcopy(&s->rxbuffer[wrote], s->rxbuffer, s->rxb_len - wrote); + s->rxb_len -= wrote; + } + return wrote; +} - // now go through the TX buffer and dispatch them - // (or change state as appropriate) - for(i=0;itxb_len<1) break; - switch(s->state) { - case STATE_ONLINE: - if (s->txbuffer[0]!='+') { - s->state=STATE_ONLINE; - emitChar(out_fd,s->txbuffer[0]); - } else { s->state=STATE_PLUS; i--; } - break; - case STATE_PLUS: - if (s->txbuffer[0]!='+') { - s->state=STATE_ONLINE; - emit(out_fd,"+"); i+=1; - emitChar(out_fd,s->txbuffer[0]); - } else { s->state=STATE_PLUSPLUS; i--; } - break; - case STATE_PLUSPLUS: - if (s->txbuffer[0]!='+') { - s->state=STATE_ONLINE; - emit(out_fd,"++"); i+=2; - emitChar(out_fd,s->txbuffer[0]); - } else { s->state=STATE_PLUSPLUSPLUS; i--; } - break; - case STATE_PLUSPLUSPLUS: - if (s->txbuffer[0]!='+') { - s->state=STATE_ONLINE; - emit(out_fd,"+++"); i+=3; - } else { - // more than 3 pluses, so start outputting the - // extras - emit(out_fd,"+"); i+=1; - s->state=STATE_PLUSPLUSPLUS; i--; - } - break; - case STATE_COMMAND: - { - emitChar(out_fd,s->txbuffer[0]); - if (s->txbuffer[0]=='\r'||s->txbuffer[0]=='\n') { - // end of command - processCommand(fd,s,out_fd); - s->cb_len=0; - } else { - if (s->cb_len<127) { - s->commandbuffer[s->cb_len++]=s->txbuffer[0]; - } - } +int transmitter=0; +long long next_transmit_time=0; + +int transfer_bytes(struct radio_state *radios) +{ + // if there's data to transmit, copy a radio packet from one device to the other + int receiver = transmitter^1; + struct radio_state *r = &radios[receiver]; + struct radio_state *t = &radios[transmitter]; + int bytes=t->txb_len; + + // TODO detect MAVLINK frame header + // respond to heartbeats? + // only transmit if we have read the entire mavlink packet + + if (bytes > radio_packet_size) + bytes = radio_packet_size; + + if (bytes>0){ + log_time(); + fprintf(stderr, "Transferring %d byte packet from %s to %s\n", bytes, t->name, r->name); + } + int i, j; + for (i=0;irxb_lenrxbuffer);i++){ + char byte = t->txbuffer[i]; + // introduce bit errors + for(j=0;j<8;j++) { + if (random()txb_len>0) { - bcopy(&s->txbuffer[1],&s->txbuffer[0],s->txb_len); - s->txb_len--; - } + r->rxbuffer[r->rxb_len++]=byte; } - // Remember the current time for TX throttling - s->last_tx_ms=gettime_ms(); - - // Output radio link status if requested - if (s->rssi_output&&(gettime_ms()-s->last_rssi_time_ms)>=1000) { - emit(fd,"L/R RSSI: 200/190 L/R noise: 80/70 pkts: 10 txe=0 rxe=0 stx=0 srx=0 ecc=0/0 temp=42 dco=0\r\n"); - s->last_rssi_time_ms=gettime_ms(); - } - - if (print_report) { - s->commandbuffer[s->cb_len]=0; - fprintf(stderr,"Radio #%d state: %d rssi_output=%d cbuf='%s', txb_len=%d\n", - fd,s->state,s->rssi_output,s->commandbuffer,s->txb_len); - } - return 0; + if (bytes>0 && bytes < t->txb_len) + bcopy(&t->txbuffer[bytes], t->txbuffer, t->txb_len - bytes); + t->txb_len-=bytes; + + // swap who's turn it is to transmit + transmitter = receiver; + + // set the wait time for the next transmission + next_transmit_time = gettime_ms() + (bytes+10)/chars_per_ms; + return bytes; } int main(int argc,char **argv) { if (argv[1]) { - chars_per_ms=atoi(argv[1]); + chars_per_ms=atol(argv[1]); if (argv[2]) - ber=atoi(argv[2]); + ber=atol(argv[2]); } - - struct radio_state left_state,right_state; - bzero(&left_state,sizeof left_state); - bzero(&right_state,sizeof right_state); - // set actual throughput to match real RFD900 radios running at 128kbit with golay encoding - // (assumes 70% efficiency for TDMA) - left_state.tx_rate=128000/2*0.7; - right_state.tx_rate=128000/2*0.7; - - int left=posix_openpt(O_RDWR|O_NOCTTY); - grantpt(left); unlockpt(left); - int right=posix_openpt(O_RDWR|O_NOCTTY); - grantpt(right); unlockpt(right); - fprintf(stdout,"%s\n",ptsname(left)); - fprintf(stdout,"%s\n",ptsname(right)); - fflush(stdout); - - fcntl(left,F_SETFL,fcntl(left, F_GETFL, NULL)|O_NONBLOCK); - fcntl(right,F_SETFL,fcntl(right, F_GETFL, NULL)|O_NONBLOCK); + fprintf(stderr, "Sending %d bytes per ms\n", chars_per_ms); + fprintf(stderr, "Introducing %f%% bit errors\n", (ber * 100.0) / 0xFFFFFFFF); struct pollfd fds[2]; + struct radio_state radios[2]; + + bzero(&radios,sizeof radios); + int i; - - fds[0].fd=left; - fds[0].events=POLLIN; - fds[1].fd=right; - fds[1].events=POLLIN; + for (i=0;i<2;i++){ + radios[i].fd=posix_openpt(O_RDWR|O_NOCTTY); + grantpt(radios[i].fd); + unlockpt(radios[i].fd); + fcntl(radios[i].fd,F_SETFL,fcntl(radios[i].fd, F_GETFL, NULL)|O_NONBLOCK); + fprintf(stdout,"%s\n",ptsname(radios[i].fd)); + fds[i].fd = radios[i].fd; + } + radios[0].name="left"; + radios[1].name="right"; + fflush(stdout); while(1) { - poll(fds,2,10); - updateState(left,&left_state,right); - updateState(right,&right_state,left); - for(i=0;i<2;i++) { - fds[i].revents=0; - if (fds[i].revents&~POLLIN) - printf("revents %x\n", fds[i].revents); + // what events do we need to poll for? how long can we block? + long long now = gettime_ms(); + long long next_event = now+10000; + + for (i=0;i<2;i++){ + // always watch for incoming data, though we will throw it away if we run out of buffer space + fds[i].events = POLLIN; + // if we have data to write data, watch for POLLOUT too. + if (radios[i].rxb_len) + fds[i].events |= POLLOUT; + + if (radios[i].rssi_output && next_event > radios[i].next_rssi_time_ms) + next_event = radios[i].next_rssi_time_ms; + + if (radios[i].state==STATE_PLUSPLUSPLUS && next_event > radios[i].last_char_ms+1000) + next_event = radios[i].last_char_ms+1000; + + if (radios[i].txb_len && next_event > next_transmit_time) + next_event = next_transmit_time; } + + int delay = next_event - now; + if (delay<0) + delay=0; + + poll(fds,2,delay); + + for (i=0;i<2;i++){ + + if (fds[i].revents & POLLIN) + read_bytes(&radios[i]); + + if (fds[i].revents & POLLOUT) + write_bytes(&radios[i]); + + now = gettime_ms(); + if (radios[i].rssi_output && now >= radios[i].next_rssi_time_ms){ + if (append_bytes(&radios[i], "L/R RSSI: 200/190 L/R noise: 80/70 pkts: 10 txe=0 rxe=0 stx=0 srx=0 ecc=0/0 temp=42 dco=0\r\n", -1)>0) + radios[i].next_rssi_time_ms=now+1000; + } + + if (radios[i].state==STATE_PLUSPLUSPLUS && now >= radios[i].last_char_ms+1000){ + fprintf(stderr, "Detected +++ from %s\n",radios[i].name); + if (append_bytes(&radios[i], "OK\r\n", -1)>0) + radios[i].state=STATE_COMMAND; + } + } + + if (now >= next_transmit_time) + transfer_bytes(radios); } return 0; diff --git a/tests/routing b/tests/routing index cf94a754..2c12183a 100755 --- a/tests/routing +++ b/tests/routing @@ -218,7 +218,7 @@ setup_simulate_extender() { setup_servald assert_no_servald_processes foreach_instance +A +B create_single_identity - $servald_build_root/fakeradio > "$SERVALD_VAR/radioout" 2> "$SERVALD_VAR/radioerr" & + $servald_build_root/fakeradio 1 10000000 > "$SERVALD_VAR/radioout" 2> "$SERVALD_VAR/radioerr" & FAKERADIO_PID=$! sleep 1 local END1=`head "$SERVALD_VAR/radioout" -n 1`