Fix bug failed dnaprotocol tests non-deterministically

Appending to the dummy interface file was altering the read offset position, so
very many receive packets were being lost.
This commit is contained in:
Andrew Bettison 2012-07-31 17:49:24 +09:30
parent ac12eac5bc
commit 76cfb7beae
2 changed files with 82 additions and 84 deletions

View File

@ -180,33 +180,32 @@ void overlay_interface_close(overlay_interface *interface){
} }
int int
overlay_interface_init_socket(int interface) { overlay_interface_init_socket(int interface_index)
char srctxt[INET_ADDRSTRLEN]; {
overlay_interface *const interface = &overlay_interfaces[interface_index];
interface->fileP = 0;
#define I(X) overlay_interfaces[interface].X interface->alarm.poll.fd = socket(PF_INET,SOCK_DGRAM,0);
I(fileP) = 0;
I(alarm.poll.fd) = socket(PF_INET,SOCK_DGRAM,0);
if (I(alarm.poll.fd) < 0) { if (interface->alarm.poll.fd < 0) {
WHY_perror("socket"); WHY_perror("socket");
WHYF("Could not create UDP socket for interface: %s",strerror(errno)); WHYF("Could not create UDP socket for interface: %s",strerror(errno));
goto error; goto error;
} }
int reuseP = 1; int reuseP = 1;
if (setsockopt(I(alarm.poll.fd), SOL_SOCKET, SO_REUSEADDR, &reuseP, sizeof(reuseP)) < 0) { if (setsockopt(interface->alarm.poll.fd, SOL_SOCKET, SO_REUSEADDR, &reuseP, sizeof(reuseP)) < 0) {
WHY_perror("setsockopt(SO_REUSEADR)"); WHY_perror("setsockopt(SO_REUSEADR)");
goto error; goto error;
} }
#ifdef SO_REUSEPORT #ifdef SO_REUSEPORT
if (setsockopt(I(alarm.poll.fd), SOL_SOCKET, SO_REUSEPORT, &reuseP, sizeof(reuseP)) < 0) { if (setsockopt(interface->alarm.poll.fd, SOL_SOCKET, SO_REUSEPORT, &reuseP, sizeof(reuseP)) < 0) {
WHY_perror("setsockopt(SO_REUSEPORT)"); WHY_perror("setsockopt(SO_REUSEPORT)");
goto error; goto error;
} }
#endif #endif
int broadcastP = 1; int broadcastP = 1;
if (setsockopt(I(alarm.poll.fd), SOL_SOCKET, SO_BROADCAST, &broadcastP, sizeof(broadcastP)) < 0) { if (setsockopt(interface->alarm.poll.fd, SOL_SOCKET, SO_BROADCAST, &broadcastP, sizeof(broadcastP)) < 0) {
WHY_perror("setsockopt(SO_BROADCAST)"); WHY_perror("setsockopt(SO_BROADCAST)");
goto error; goto error;
} }
@ -214,7 +213,7 @@ overlay_interface_init_socket(int interface) {
/* Automatically close socket on calls to exec(). /* Automatically close socket on calls to exec().
This makes life easier when we restart with an exec after receiving This makes life easier when we restart with an exec after receiving
a bad signal. */ a bad signal. */
fcntl(I(alarm.poll.fd), F_SETFL, fcntl(I(alarm.poll.fd), F_GETFL, NULL) | O_CLOEXEC); fcntl(interface->alarm.poll.fd, F_SETFL, fcntl(interface->alarm.poll.fd, F_GETFL, NULL) | O_CLOEXEC);
#ifdef SO_BINDTODEVICE #ifdef SO_BINDTODEVICE
/* /*
@ -222,7 +221,7 @@ overlay_interface_init_socket(int interface) {
This should allow for a device with multiple interfaces on the same subnet. This should allow for a device with multiple interfaces on the same subnet.
Don't abort if this fails, just log it. Don't abort if this fails, just log it.
*/ */
if (setsockopt(I(alarm.poll.fd), SOL_SOCKET, SO_BINDTODEVICE, I(name), strlen(I(name))+1) < 0) { if (setsockopt(interface->alarm.poll.fd, SOL_SOCKET, SO_BINDTODEVICE, interface->name, strlen(interface->name)+1) < 0) {
WHY_perror("setsockopt(SO_BINDTODEVICE)"); WHY_perror("setsockopt(SO_BINDTODEVICE)");
} }
#endif #endif
@ -232,37 +231,41 @@ overlay_interface_init_socket(int interface) {
traffic on all platforms. BUT on OSX we really need a non-broadcast socket traffic on all platforms. BUT on OSX we really need a non-broadcast socket
to send from, because you cannot send from a broadcast socket on OSX it seems. to send from, because you cannot send from a broadcast socket on OSX it seems.
*/ */
I(broadcast_address.sin_family) = AF_INET; interface->broadcast_address.sin_family = AF_INET;
I(broadcast_address.sin_port) = htons(I(port)); interface->broadcast_address.sin_port = htons(interface->port);
if (bind(I(alarm.poll.fd), (const struct sockaddr *)&I(broadcast_address), sizeof(I(broadcast_address)))) { if (bind(interface->alarm.poll.fd, (const struct sockaddr *)&interface->broadcast_address, sizeof(interface->broadcast_address))) {
WHY_perror("bind"); WHY_perror("bind");
WHY("MP HLR server could not bind to requested UDP port (bind() failed)"); WHY("MP HLR server could not bind to requested UDP port (bind() failed)");
goto error; goto error;
} }
assert(inet_ntop(AF_INET, (const void *)&I(broadcast_address.sin_addr), srctxt, INET_ADDRSTRLEN) != NULL); char srctxt[INET_ADDRSTRLEN];
if (debug & (DEBUG_PACKETRX | DEBUG_IO)) DEBUGF("Bound to %s:%d", srctxt, ntohs(I(broadcast_address.sin_port))); if (inet_ntop(AF_INET, (const void *)&interface->broadcast_address.sin_addr, srctxt, INET_ADDRSTRLEN) == NULL) {
WHY_perror("inet_ntop");
goto error;
}
if (debug & (DEBUG_PACKETRX | DEBUG_IO))
DEBUGF("Bound to %s:%d", srctxt, ntohs(interface->broadcast_address.sin_port));
I(alarm.poll.events)=POLLIN; interface->alarm.poll.events=POLLIN;
I(alarm.function) = overlay_interface_poll; interface->alarm.function = overlay_interface_poll;
interface_poll_stats.name="overlay_interface_poll"; interface_poll_stats.name="overlay_interface_poll";
I(alarm.stats)=&interface_poll_stats; interface->alarm.stats=&interface_poll_stats;
watch(&I(alarm)); watch(&interface->alarm);
// run the first tick asap // run the first tick asap
I(alarm.alarm)=gettime_ms(); interface->alarm.alarm=gettime_ms();
I(alarm.deadline)=I(alarm.alarm)+10; interface->alarm.deadline=interface->alarm.alarm+10;
schedule(&I(alarm)); schedule(&interface->alarm);
I(state)=INTERFACE_STATE_UP; interface->state=INTERFACE_STATE_UP;
INFOF("Interface %s addr %s, is up",I(name), inet_ntoa(I(broadcast_address.sin_addr))); INFOF("Interface %s addr %s, is up",interface->name, inet_ntoa(interface->broadcast_address.sin_addr));
return 0; return 0;
error: error:
overlay_interface_close(&overlay_interfaces[interface]); overlay_interface_close(interface);
return -1; return -1;
#undef I
} }
int overlay_interface_init(char *name,struct sockaddr_in *src_addr,struct sockaddr_in *broadcast, int overlay_interface_init(char *name,struct sockaddr_in *src_addr,struct sockaddr_in *broadcast,
@ -271,38 +274,38 @@ int overlay_interface_init(char *name,struct sockaddr_in *src_addr,struct sockad
/* Too many interfaces */ /* Too many interfaces */
if (overlay_interface_count>=OVERLAY_MAX_INTERFACES) return WHY("Too many interfaces -- Increase OVERLAY_MAX_INTERFACES"); if (overlay_interface_count>=OVERLAY_MAX_INTERFACES) return WHY("Too many interfaces -- Increase OVERLAY_MAX_INTERFACES");
#define I(X) overlay_interfaces[overlay_interface_count].X overlay_interface *const interface = &overlay_interfaces[overlay_interface_count];
strcpy(I(name),name); strcpy(interface->name,name);
/* Pick a reasonable default MTU. /* Pick a reasonable default MTU.
This will ultimately get tuned by the bandwidth and other properties of the interface */ This will ultimately get tuned by the bandwidth and other properties of the interface */
I(mtu)=1200; interface->mtu=1200;
I(state)=INTERFACE_STATE_DOWN; interface->state=INTERFACE_STATE_DOWN;
I(bits_per_second)=speed_in_bits; interface->bits_per_second=speed_in_bits;
I(port)=port; interface->port=port;
I(type)=type; interface->type=type;
I(last_tick_ms)= -1; // not ticked yet interface->last_tick_ms= -1; // not ticked yet
I(alarm.poll.fd)=0; interface->alarm.poll.fd=0;
switch (type) { switch (type) {
case OVERLAY_INTERFACE_PACKETRADIO: case OVERLAY_INTERFACE_PACKETRADIO:
I(tick_ms) = confValueGetInt64Range("mdp.packetradio.tick_ms", 15000LL, 1LL, 3600000LL); interface->tick_ms = confValueGetInt64Range("mdp.packetradio.tick_ms", 15000LL, 1LL, 3600000LL);
break; break;
case OVERLAY_INTERFACE_ETHERNET: case OVERLAY_INTERFACE_ETHERNET:
I(tick_ms) = confValueGetInt64Range("mdp.ethernet.tick_ms", 500LL, 1LL, 3600000LL); interface->tick_ms = confValueGetInt64Range("mdp.ethernet.tick_ms", 500LL, 1LL, 3600000LL);
break; break;
case OVERLAY_INTERFACE_WIFI: case OVERLAY_INTERFACE_WIFI:
I(tick_ms) = confValueGetInt64Range("mdp.wifi.tick_ms", 500LL, 1LL, 3600000LL); interface->tick_ms = confValueGetInt64Range("mdp.wifi.tick_ms", 500LL, 1LL, 3600000LL);
break; break;
case OVERLAY_INTERFACE_UNKNOWN: case OVERLAY_INTERFACE_UNKNOWN:
I(tick_ms) = confValueGetInt64Range("mdp.unknown.tick_ms", 500LL, 1LL, 3600000LL); interface->tick_ms = confValueGetInt64Range("mdp.unknown.tick_ms", 500LL, 1LL, 3600000LL);
break; break;
default: default:
return WHYF("Unsupported interface type %d", type); return WHYF("Unsupported interface type %d", type);
} }
if (name[0]=='>') { if (name[0]=='>') {
I(fileP)=1; interface->fileP=1;
char dummyfile[1024]; char dummyfile[1024];
if (name[1]=='/') { if (name[1]=='/') {
/* Absolute path */ /* Absolute path */
@ -312,35 +315,34 @@ int overlay_interface_init(char *name,struct sockaddr_in *src_addr,struct sockad
if (!FORM_SERVAL_INSTANCE_PATH(dummyfile, &name[1])) if (!FORM_SERVAL_INSTANCE_PATH(dummyfile, &name[1]))
return WHY("could not form dummy interfance name"); return WHY("could not form dummy interfance name");
if ((I(alarm.poll.fd) = open(dummyfile,O_APPEND|O_RDWR)) < 1) { if ((interface->alarm.poll.fd = open(dummyfile,O_APPEND|O_RDWR)) < 1) {
return WHY("could not open dummy interface file for append"); return WHY("could not open dummy interface file for append");
} }
/* Seek to end of file as initial reading point */ /* Seek to end of file as initial reading point */
I(offset)=lseek(I(alarm.poll.fd),0,SEEK_END); /* socket gets reused to hold file offset */ interface->recv_offset = lseek(interface->alarm.poll.fd,0,SEEK_END);
/* XXX later add pretend location information so that we can decide which "packets" to receive /* XXX later add pretend location information so that we can decide which "packets" to receive
based on closeness */ based on closeness */
// schedule an alarm for this interface // schedule an alarm for this interface
I(alarm.function)=overlay_dummy_poll; interface->alarm.function=overlay_dummy_poll;
I(alarm.alarm)=gettime_ms()+10; interface->alarm.alarm=gettime_ms()+10;
I(alarm.deadline)=I(alarm.alarm); interface->alarm.deadline=interface->alarm.alarm;
dummy_poll_stats.name="overlay_dummy_poll"; dummy_poll_stats.name="overlay_dummy_poll";
I(alarm.stats)=&dummy_poll_stats; interface->alarm.stats=&dummy_poll_stats;
schedule(&I(alarm)); schedule(&interface->alarm);
I(state)=INTERFACE_STATE_UP; interface->state=INTERFACE_STATE_UP;
INFOF("Dummy interface %s is up",I(name)); INFOF("Dummy interface %s is up",interface->name);
} else { } else {
bcopy(src_addr, &I(address), sizeof(struct sockaddr_in)); bcopy(src_addr, &interface->address, sizeof(struct sockaddr_in));
bcopy(broadcast, &I(broadcast_address), sizeof(struct sockaddr_in)); bcopy(broadcast, &interface->broadcast_address, sizeof(struct sockaddr_in));
if (overlay_interface_init_socket(overlay_interface_count)) if (overlay_interface_init_socket(overlay_interface_count))
return WHY("overlay_interface_init_socket() failed"); return WHY("overlay_interface_init_socket() failed");
} }
overlay_interface_count++; overlay_interface_count++;
#undef I
return 0; return 0;
} }
@ -396,7 +398,7 @@ void overlay_dummy_poll(struct sched_ent *alarm)
/* XXX Okay, so how are we managing out-of-process consumers? /* XXX Okay, so how are we managing out-of-process consumers?
They need some way to register their interest in listening to a port. They need some way to register their interest in listening to a port.
*/ */
unsigned char packet[16384]; unsigned char packet[2048];
int plen=0; int plen=0;
struct sockaddr src_addr; struct sockaddr src_addr;
size_t addrlen = sizeof(src_addr); size_t addrlen = sizeof(src_addr);
@ -411,28 +413,27 @@ void overlay_dummy_poll(struct sched_ent *alarm)
/* Read from dummy interface file */ /* Read from dummy interface file */
long long length=lseek(alarm->poll.fd,0,SEEK_END); long long length=lseek(alarm->poll.fd,0,SEEK_END);
if (interface->offset>=length) if (interface->recv_offset >= length) {
{ /* if there's no input, while we want to check for more soon,
/* if there's no input, while we want to check for more soon, we need to allow all other low priority alarms to fire first,
we need to allow all other low priority alarms to fire first, otherwise we'll dominate the scheduler without accomplishing anything */
otherwise we'll dominate the scheduler without accomplishing anything */ alarm->alarm = gettime_ms() + 20;
alarm->alarm = gettime_ms() + 20; if (interface->last_tick_ms != -1 && alarm->alarm > interface->last_tick_ms + interface->tick_ms)
if (interface->last_tick_ms != -1 && alarm->alarm > interface->last_tick_ms + interface->tick_ms) alarm->alarm = interface->last_tick_ms + interface->tick_ms;
alarm->alarm = interface->last_tick_ms + interface->tick_ms; alarm->deadline = alarm->alarm + 10000;
alarm->deadline = alarm->alarm + 10000; } else {
} if (lseek(alarm->poll.fd,interface->recv_offset,SEEK_SET) == -1)
else WHY_perror("lseek");
{ else {
lseek(alarm->poll.fd,interface->offset,SEEK_SET);
if (debug&DEBUG_OVERLAYINTERFACES) if (debug&DEBUG_OVERLAYINTERFACES)
DEBUGF("Read interface %s (size=%lld) at offset=%d",interface->name, length, interface->offset); DEBUGF("Read interface %s (size=%lld) at offset=%d",interface->name, length, interface->recv_offset);
ssize_t nread = read(alarm->poll.fd,&packet[0],2048); ssize_t nread = read(alarm->poll.fd, packet, sizeof packet);
if (nread == -1) if (nread == -1)
WHY_perror("read"); WHY_perror("read");
else { else {
interface->offset += nread; interface->recv_offset += nread;
if (nread == 2048) { if (nread == sizeof packet) {
plen = packet[110]+(packet[111]<<8); plen = packet[110] + (packet[111] << 8);
if (plen > nread - 128) if (plen > nread - 128)
plen = -1; plen = -1;
if (debug&DEBUG_PACKETRX) if (debug&DEBUG_PACKETRX)
@ -453,13 +454,12 @@ void overlay_dummy_poll(struct sched_ent *alarm)
else else
WARNF("Read %lld bytes from dummy interface", nread); WARNF("Read %lld bytes from dummy interface", nread);
} }
/* keep reading new packets as fast as possible,
but don't prevent other high priority alarms */
alarm->alarm = gettime_ms();
alarm->deadline = alarm->alarm + 200;
} }
/* keep reading new packets as fast as possible,
but don't prevent other high priority alarms */
alarm->alarm = gettime_ms();
alarm->deadline = alarm->alarm + 200;
}
schedule(alarm); schedule(alarm);
@ -538,13 +538,11 @@ int overlay_broadcast_ensemble(int interface_number,
off_t fsize = lseek(interface->alarm.poll.fd, (off_t) 0, SEEK_END); off_t fsize = lseek(interface->alarm.poll.fd, (off_t) 0, SEEK_END);
if (fsize == -1) if (fsize == -1)
return WHY_perror("lseek"); return WHY_perror("lseek");
interface->offset = fsize;
if (debug&DEBUG_OVERLAYINTERFACES) if (debug&DEBUG_OVERLAYINTERFACES)
DEBUGF("Write to interface %s at offset=%d", interface->name, interface->offset); DEBUGF("Write to interface %s at offset=%d", interface->name, fsize);
ssize_t nwrite = write(interface->alarm.poll.fd, buf, 2048); ssize_t nwrite = write(interface->alarm.poll.fd, buf, 2048);
if (nwrite == -1) if (nwrite == -1)
return WHY_perror("write"); return WHY_perror("write");
interface->offset += nwrite;
if (nwrite != 2048) if (nwrite != 2048)
return WHYF("only wrote %lld of %lld bytes", nwrite, 2048); return WHYF("only wrote %lld of %lld bytes", nwrite, 2048);
return 0; return 0;

View File

@ -377,7 +377,7 @@ extern int overlayMode;
typedef struct overlay_interface { typedef struct overlay_interface {
struct sched_ent alarm; struct sched_ent alarm;
char name[80]; char name[80];
int offset; int recv_offset;
int fileP; int fileP;
int bits_per_second; int bits_per_second;
int port; int port;