Refactor interface handling to separate encapsulation from stream type

This commit is contained in:
Jeremy Lakeman 2013-02-14 14:18:56 +10:30
parent 95f2ed3225
commit 6d9bbe2e2c
19 changed files with 668 additions and 701 deletions

View File

@ -319,6 +319,36 @@ int cf_opt_interface_type(short *typep, const char *text)
return CFINVALID;
}
int cf_opt_socket_type(int *typep, const char *text)
{
if (strcasecmp(text, "dgram") == 0) {
*typep = SOCK_DGRAM;
return CFOK;
}
if (strcasecmp(text, "stream") == 0) {
*typep = SOCK_STREAM;
return CFOK;
}
if (strcasecmp(text, "file") == 0) {
*typep = SOCK_FILE;
return CFOK;
}
return CFINVALID;
}
int cf_opt_encapsulation(int *typep, const char *text)
{
if (strcasecmp(text, "overlay") == 0) {
*typep = ENCAP_OVERLAY;
return CFOK;
}
if (strcasecmp(text, "single") == 0) {
*typep = ENCAP_SINGLE;
return CFOK;
}
return CFINVALID;
}
int cf_opt_pattern_list(struct pattern_list *listp, const char *text)
{
struct pattern_list list;
@ -391,9 +421,9 @@ static int cf_opt_network_interface_legacy(struct config_network_interface *nifp
p = endtext;
size_t len = p - name;
if (name[0] == '>') {
if (len - 1 >= sizeof(nif.dummy))
if (len - 1 >= sizeof(nif.file))
return CFSTRINGOVERFLOW;
strncpy(nif.dummy, &name[1], len - 1)[len - 1] = '\0';
strncpy(nif.file, &name[1], len - 1)[len - 1] = '\0';
nif.match.patc = 0;
} else {
int star = (strchr(name, '*') != NULL) ? 1 : 0;
@ -460,19 +490,41 @@ int cf_opt_network_interface(struct config_network_interface *nifp, const struct
int vld_network_interface(const struct cf_om_node *parent, struct config_network_interface *nifp, int result)
{
if (nifp->match.patc != 0 && nifp->dummy[0]) {
if (nifp->match.patc != 0 && nifp->file[0]) {
int nodei_match = cf_om_get_child(parent, "match", NULL);
int nodei_dummy = cf_om_get_child(parent, "dummy", NULL);
int nodei_file = cf_om_get_child(parent, "file", NULL);
assert(nodei_match != -1);
assert(nodei_dummy != -1);
cf_warn_incompatible(parent->nodv[nodei_match], parent->nodv[nodei_dummy]);
assert(nodei_file != -1);
cf_warn_incompatible(parent->nodv[nodei_match], parent->nodv[nodei_file]);
return result | CFSUB(CFINCOMPATIBLE);
}
if (nifp->match.patc == 0 && !nifp->dummy[0]) {
DEBUGF("dummy=%s", alloca_str_toprint(nifp->dummy));
cf_warn_missing_node(parent, "match");
return result | CFINCOMPLETE;
if (nifp->socket_type == SOCK_UNSPECIFIED){
if (nifp->match.patc != 0)
nifp->socket_type=SOCK_DGRAM;
else if (nifp->file[0])
nifp->socket_type=SOCK_FILE;
else{
cf_warn_missing_node(parent, "match");
return result | CFINCOMPLETE;
}
}else{
if (nifp->socket_type==SOCK_DGRAM && nifp->file[0]){
int nodei_socket_type = cf_om_get_child(parent, "socket_type", NULL);
int nodei_file = cf_om_get_child(parent, "file", NULL);
assert(nodei_socket_type != -1);
assert(nodei_file != -1);
cf_warn_incompatible(parent->nodv[nodei_socket_type], parent->nodv[nodei_file]);
return result | CFSUB(CFINCOMPATIBLE);
}
if (nifp->socket_type!=SOCK_DGRAM && !nifp->file[0]){
cf_warn_missing_node(parent, "file");
return result | CFSUB(CFINCOMPATIBLE);
}
}
return result;
}

View File

@ -218,7 +218,7 @@ END_STRUCT
STRUCT(server)
STRING(256, chdir, "/", cf_opt_absolute_path,, "Absolute path of chdir(2) for server process")
STRING(256, dummy_interface_dir, "", cf_opt_str_nonempty,, "Path of directory containing dummy interface files, either absolute or relative to instance directory")
STRING(256, interface_path, "", cf_opt_str_nonempty,, "Path of directory containing interface files, either absolute or relative to instance directory")
ATOM(int, respawn_on_crash, 0, cf_opt_int_boolean,, "If true, server will exec(2) itself on fatal signals, eg SEGV")
END_STRUCT
@ -332,13 +332,15 @@ END_ARRAY(32)
STRUCT(network_interface, vld_network_interface)
ATOM(int, exclude, 0, cf_opt_int_boolean,, "If true, do not use matching interfaces")
ATOM(struct pattern_list, match, PATTERN_LIST_EMPTY, cf_opt_pattern_list,, "Names that match network interface")
STRING(256, dummy, "", cf_opt_str_nonempty,, "Path of dummy file, absolute or relative to server.dummy_interface_dir")
ATOM(int, socket_type, SOCK_UNSPECIFIED, cf_opt_socket_type,, "Type of network socket")
ATOM(int, encapsulation, ENCAP_OVERLAY, cf_opt_encapsulation,, "Type of packet encapsulation")
STRING(256, file, "", cf_opt_str_nonempty,, "Path of interface file, absolute or relative to server.interface_path")
ATOM(struct in_addr, dummy_address, hton_in_addr(INADDR_LOOPBACK), cf_opt_in_addr,, "Dummy interface address")
ATOM(struct in_addr, dummy_netmask, hton_in_addr(0xFFFFFF00), cf_opt_in_addr,, "Dummy interface netmask")
ATOM(char, dummy_filter_broadcasts, 0, cf_opt_char_boolean,, "If true, drop all incoming broadcast packets")
ATOM(char, dummy_filter_unicasts, 0, cf_opt_char_boolean,, "If true, drop all incoming unicast packets")
ATOM(uint16_t, port, PORT_DNA, cf_opt_uint16_nonzero,, "Port number for network interface")
ATOM(char, drop_broadcasts, 0, cf_opt_char_boolean,, "If true, drop all incoming broadcast packets")
ATOM(char, drop_unicasts, 0, cf_opt_char_boolean,, "If true, drop all incoming unicast packets")
ATOM(short, type, OVERLAY_INTERFACE_WIFI, cf_opt_interface_type,, "Type of network interface")
ATOM(uint16_t, port, RHIZOME_HTTP_PORT, cf_opt_uint16_nonzero,, "Port number for network interface")
ATOM(int, packet_interval, -1, cf_opt_int,, "Minimum interval between packets in microseconds")
ATOM(int, mdp_tick_ms, -1, cf_opt_int32_nonneg,, "Override MDP tick interval for this interface")
ATOM(char, send_broadcasts, 1, cf_opt_char_boolean,, "If false, don't send any broadcast packets")

View File

@ -206,4 +206,10 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define DEFAULT_MONITOR_SOCKET_NAME "org.servalproject.servald.monitor.socket"
#define DEFAULT_MDP_SOCKET_NAME "org.servalproject.servald.mdp.socket"
#define SOCK_FILE 0xFF
#define SOCK_UNSPECIFIED 0
#define ENCAP_OVERLAY 1
#define ENCAP_SINGLE 2
#endif // __SERVALD_CONSTANTS_H

2
log.c
View File

@ -361,6 +361,7 @@ ssize_t get_self_executable_path(char *buf, size_t len)
int log_backtrace(struct __sourceloc whence)
{
#ifndef NO_BACKTRACE
open_logging();
char execpath[MAXPATHLEN];
if (get_self_executable_path(execpath, sizeof execpath) == -1)
@ -458,5 +459,6 @@ int log_backtrace(struct __sourceloc whence)
strbuf_append_exit_status(b, status);
logMessage(LOG_LEVEL_DEBUG, __NOWHERE__, "gdb %s", buf);
unlink(tempfile);
#endif
return 0;
}

View File

@ -237,6 +237,10 @@ int ob_append_bytes(struct overlay_buffer *b,unsigned char *bytes,int count)
return 0;
}
int ob_append_buffer(struct overlay_buffer *b,struct overlay_buffer *s){
return ob_append_bytes(b, s->bytes, s->position);
}
int ob_append_ui16(struct overlay_buffer *b, uint16_t v)
{
if (ob_makespace(b, 2)) return WHY("ob_makespace() failed");

View File

@ -54,6 +54,7 @@ int ob_unlimitsize(struct overlay_buffer *b);
int ob_makespace(struct overlay_buffer *b,int bytes);
int ob_append_byte(struct overlay_buffer *b,unsigned char byte);
int ob_append_bytes(struct overlay_buffer *b,unsigned char *bytes,int count);
int ob_append_buffer(struct overlay_buffer *b,struct overlay_buffer *s);
unsigned char *ob_append_space(struct overlay_buffer *b,int count);
int ob_append_ui16(struct overlay_buffer *b, uint16_t v);
int ob_append_ui32(struct overlay_buffer *b, uint32_t v);

View File

@ -42,7 +42,6 @@ overlay_interface overlay_interfaces[OVERLAY_MAX_INTERFACES];
int overlay_last_interface_number=-1;
struct profile_total interface_poll_stats;
struct profile_total dummy_poll_stats;
struct sched_ent sock_any;
struct sockaddr_in sock_any_addr;
@ -265,7 +264,6 @@ static int
overlay_interface_init_socket(int interface_index)
{
overlay_interface *const interface = &overlay_interfaces[interface_index];
interface->fileP = 0;
/*
On linux you can bind to the broadcast address to receive broadcast packets per interface [or subnet],
@ -297,25 +295,7 @@ overlay_interface_init_socket(int interface_index)
}
interface->alarm.poll.events=POLLIN;
interface->alarm.function = overlay_interface_poll;
interface_poll_stats.name="overlay_interface_poll";
interface->alarm.stats=&interface_poll_stats;
watch(&interface->alarm);
if (interface->tick_ms>0){
// run the first tick asap
interface->alarm.alarm=gettime_ms();
interface->alarm.deadline=interface->alarm.alarm+10;
schedule(&interface->alarm);
}
interface->state=INTERFACE_STATE_UP;
INFOF("Interface %s addr %s:%d, is up",interface->name,
inet_ntoa(interface->address.sin_addr), ntohs(interface->address.sin_port));
directory_registration();
return 0;
}
@ -331,35 +311,44 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr
strncpy(interface->name, name, sizeof interface->name);
// copy ifconfig values
interface->drop_broadcasts = ifconfig->drop_broadcasts;
interface->drop_unicasts = ifconfig->drop_unicasts;
interface->port = ifconfig->port;
interface->type = ifconfig->type;
interface->send_broadcasts = ifconfig->send_broadcasts;
interface->prefer_unicast = ifconfig->prefer_unicast;
interface->default_route = ifconfig->default_route;
interface->socket_type = ifconfig->socket_type;
interface->encapsulation = ifconfig->encapsulation;
/* Pick a reasonable default MTU.
This will ultimately get tuned by the bandwidth and other properties of the interface */
interface->mtu=1200;
interface->state=INTERFACE_STATE_DOWN;
interface->port= ifconfig->port;
interface->type= ifconfig->type;
interface->default_route = ifconfig->default_route;
interface->last_tick_ms= -1; // not ticked yet
interface->alarm.poll.fd=0;
// How often do we announce ourselves on this interface?
interface->tick_ms=-1;
int tick_ms=-1;
int packet_interval=-1;
// hard coded defaults:
switch (ifconfig->type) {
case OVERLAY_INTERFACE_PACKETRADIO:
interface->tick_ms = 15000;
tick_ms = 15000;
packet_interval = 1000;
break;
case OVERLAY_INTERFACE_ETHERNET:
interface->tick_ms = 500;
tick_ms = 500;
packet_interval = 100;
break;
case OVERLAY_INTERFACE_WIFI:
interface->tick_ms = 500;
tick_ms = 500;
packet_interval = 400;
break;
case OVERLAY_INTERFACE_UNKNOWN:
interface->tick_ms = 500;
tick_ms = 500;
packet_interval = 100;
break;
}
@ -368,193 +357,338 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr
int i = config_mdp_iftypelist__get(&config.mdp.iftype, &ifconfig->type);
if (i != -1){
if (config.mdp.iftype.av[i].value.tick_ms>=0)
interface->tick_ms = config.mdp.iftype.av[i].value.tick_ms;
tick_ms = config.mdp.iftype.av[i].value.tick_ms;
if (config.mdp.iftype.av[i].value.packet_interval>=0)
packet_interval=config.mdp.iftype.av[i].value.packet_interval;
}
}
// specific value for this interface
if (ifconfig->mdp_tick_ms>=0)
interface->tick_ms = ifconfig->mdp_tick_ms;
tick_ms = ifconfig->mdp_tick_ms;
if (ifconfig->packet_interval>=0)
packet_interval=ifconfig->packet_interval;
interface->send_broadcasts=ifconfig->send_broadcasts;
if (packet_interval<0)
return WHYF("Invalid packet interval %d specified for interface %s", packet_interval, name);
if (packet_interval==0){
INFOF("Interface %s is not sending any traffic!", name);
interface->send_broadcasts=0;
interface->tick_ms=0;
tick_ms=0;
}else if (!interface->send_broadcasts){
INFOF("Interface %s is not sending any broadcast traffic!", name);
// no broadcast traffic implies no ticks
interface->tick_ms=0;
}else if (interface->tick_ms==0)
tick_ms=0;
}else if (tick_ms==0)
INFOF("Interface %s is running tickless", name);
if (interface->tick_ms<0)
if (tick_ms<0)
return WHYF("Invalid tick interval %d specified for interface %s", interface->tick_ms, name);
interface->tick_ms = tick_ms;
limit_init(&interface->transfer_limit, packet_interval);
interface->prefer_unicast = ifconfig->prefer_unicast;
interface->address.sin_family=AF_INET;
interface->address.sin_port = htons(ifconfig->port);
interface->address.sin_addr = ifconfig->dummy_address;
if (ifconfig->dummy[0]||ifconfig->type==OVERLAY_INTERFACE_PACKETRADIO) {
interface->fileP = 1;
char dummyfile[1024];
if (ifconfig->type==OVERLAY_INTERFACE_PACKETRADIO) {
if (config.debug.packetradio) DEBUGF("Considering packet radio interface %s",
ifconfig->dummy);
snprintf(dummyfile, 1024, "%s",ifconfig->dummy);
} else {
strbuf d = strbuf_local(dummyfile, sizeof dummyfile);
strbuf_path_join(d, serval_instancepath(), config.server.dummy_interface_dir, ifconfig->dummy, NULL);
if (strbuf_overrun(d))
return WHYF("dummy or packet radio interface file name overrun: %s", alloca_str_toprint(strbuf_str(d)));
}
if ((interface->alarm.poll.fd = open(dummyfile,O_APPEND|O_RDWR)) < 1) {
return WHYF("could not open dummy or packet radio interface file %s for append (errno=%d)", dummyfile,errno);
}
if (interface->type==OVERLAY_INTERFACE_PACKETRADIO) {
if (config.debug.packetradio) DEBUGF("Opened packet radio interface");
overlay_packetradio_setup_port(interface);
}
bzero(&interface->address, sizeof(interface->address));
interface->address.sin_family=AF_INET;
interface->address.sin_port = htons(PORT_DNA);
interface->address.sin_addr = ifconfig->dummy_address;
interface->netmask=ifconfig->dummy_netmask;
bzero(&interface->broadcast_address, sizeof(interface->address));
interface->broadcast_address.sin_family=AF_INET;
interface->broadcast_address.sin_port = htons(PORT_DNA);
interface->broadcast_address.sin_addr.s_addr = interface->address.sin_addr.s_addr | ~interface->netmask.s_addr;
interface->drop_broadcasts = ifconfig->dummy_filter_broadcasts;
interface->drop_unicasts = ifconfig->dummy_filter_unicasts;
if (interface->type!=OVERLAY_INTERFACE_PACKETRADIO) {
/* Seek to end of file as initial reading point */
interface->recv_offset = lseek(interface->alarm.poll.fd,0,SEEK_END);
/* XXX later add pretend location information so that we can decide which "packets" to receive
based on closeness */
} else {
// Setup packet radio device
// XXX This needs to be parameterised at some point
// Make sure it is not in command mode
write(interface->alarm.poll.fd,"ATO\r",4);
if (config.debug.packetradio)
DEBUGF("Sent ATO to make sure we are in on-line mode");
}
// schedule an alarm for this interface
if (interface->type!=OVERLAY_INTERFACE_PACKETRADIO) {
interface->alarm.function=overlay_dummy_poll;
dummy_poll_stats.name="overlay_dummy_poll";
interface->alarm.alarm=gettime_ms()+10;
interface->alarm.deadline=interface->alarm.alarm;
interface->alarm.stats=&dummy_poll_stats;
schedule(&interface->alarm);
} else {
interface->alarm.function=overlay_packetradio_poll;
dummy_poll_stats.name="overlay_packetradio_poll";
interface->alarm.poll.events=POLLIN;
watch(&interface->alarm);
if (config.debug.packetradio)
DEBUGF("Watching file descriptor #%d for packet radio interface",
interface->alarm.poll.fd);
if (interface->tick_ms>0){
// run the first tick asap
interface->alarm.alarm=gettime_ms();
interface->alarm.deadline=interface->alarm.alarm+10;
schedule(&interface->alarm);
}
}
interface->state=INTERFACE_STATE_UP;
if (interface->type!=OVERLAY_INTERFACE_PACKETRADIO)
INFOF("Dummy interface %s addr %s:%d, is up",interface->name,
inet_ntoa(interface->address.sin_addr), ntohs(interface->address.sin_port));
else
INFOF("Packet radio interface %s, is up",interface->name);
directory_registration();
} else {
interface->netmask=ifconfig->dummy_netmask;
interface->broadcast_address.sin_family=AF_INET;
interface->broadcast_address.sin_port = htons(ifconfig->port);
interface->broadcast_address.sin_addr.s_addr = interface->address.sin_addr.s_addr | ~interface->netmask.s_addr;
interface->alarm.function = overlay_interface_poll;
interface_poll_stats.name="overlay_interface_poll";
interface->alarm.stats=&interface_poll_stats;
if (ifconfig->socket_type==SOCK_DGRAM){
interface->address.sin_addr = src_addr;
interface->broadcast_address.sin_addr = broadcast;
interface->netmask = netmask;
interface->address.sin_addr = src_addr;
interface->address.sin_family = AF_INET;
interface->address.sin_port = htons(interface->port);
interface->broadcast_address.sin_addr = broadcast;
interface->broadcast_address.sin_family = AF_INET;
interface->broadcast_address.sin_port = htons(interface->port);
if (overlay_interface_init_socket(overlay_interface_count))
return WHY("overlay_interface_init_socket() failed");
return WHY("overlay_interface_init_socket() failed");
}else{
char read_file[1024];
strbuf d = strbuf_local(read_file, sizeof read_file);
strbuf_path_join(d, serval_instancepath(), config.server.interface_path, ifconfig->file, NULL);
if (strbuf_overrun(d)){
WHYF("interface file name overrun: %s", alloca_str_toprint(strbuf_str(d)));
goto cleanup;
}
if ((interface->alarm.poll.fd = open(read_file,O_APPEND|O_RDWR)) < 1) {
WHYF_perror("could not open interface file %s", read_file);
goto cleanup;
}
if (ifconfig->type==OVERLAY_INTERFACE_PACKETRADIO)
overlay_packetradio_setup_port(interface);
if (ifconfig->socket_type==SOCK_STREAM){
interface->slip_decode_state.dst_offset=0;
interface->alarm.poll.events=POLLIN;
watch(&interface->alarm);
}else if(ifconfig->socket_type==SOCK_FILE){
/* Seek to end of file as initial reading point */
interface->recv_offset = lseek(interface->alarm.poll.fd,0,SEEK_END);
}
}
// schedule the first tick asap
interface->alarm.alarm=gettime_ms();
interface->alarm.deadline=interface->alarm.alarm;
schedule(&interface->alarm);
interface->state=INTERFACE_STATE_UP;
INFOF("Interface %s addr %s:%d, is up",interface->name,
inet_ntoa(interface->address.sin_addr), ntohs(interface->address.sin_port));
directory_registration();
INFOF("Allowing a maximum of %d packets every %lldms", interface->transfer_limit.burst_size, interface->transfer_limit.burst_length);
overlay_interface_count++;
return 0;
cleanup:
if (interface->alarm.poll.fd>=0){
unwatch(&interface->alarm);
close(interface->alarm.poll.fd);
interface->alarm.poll.fd=-1;
}
interface->state=INTERFACE_STATE_DOWN;
return -1;
}
static void interface_read_dgram(struct overlay_interface *interface){
int plen=0;
unsigned char packet[8096];
struct sockaddr src_addr;
socklen_t addrlen = sizeof(src_addr);
/* Read only one UDP packet per call to share resources more fairly, and also
enable stats to accurately count packets received */
int recvttl=1;
plen = recvwithttl(interface->alarm.poll.fd,packet, sizeof(packet), &recvttl, &src_addr, &addrlen);
if (plen == -1) {
WHY_perror("recvwithttl(c)");
overlay_interface_close(interface);
return;
}
/* We have a frame from this interface */
if (config.debug.packetrx)
DEBUG_packet_visualise("Read from real interface", packet,plen);
if (config.debug.overlayinterfaces) {
struct in_addr src = ((struct sockaddr_in *)&src_addr)->sin_addr; // avoid strict-alias warning on Solaris (gcc 4.4)
DEBUGF("Received %d bytes from %s on interface %s",plen,
inet_ntoa(src),
interface->name);
}
if (packetOkOverlay(interface, packet, plen, recvttl, &src_addr, addrlen)) {
WHY("Malformed packet");
// Do we really want to attempt to parse it again?
//DEBUG_packet_visualise("Malformed packet", packet,plen);
}
}
struct file_packet{
struct sockaddr_in src_addr;
struct sockaddr_in dst_addr;
int pid;
int payload_length;
/* TODO ? ;
half-power beam height (uint16)
half-power beam width (uint16)
range in metres, centre beam (uint32)
latitude (uint32)
longitude (uint32)
X/Z direction (uint16)
Y direction (uint16)
speed in metres per second (uint16)
TX frequency in Hz, uncorrected for doppler (which must be done at the receiving end to take into account
relative motion)
coding method (use for doppler response etc) null terminated string
*/
unsigned char payload[1400];
};
static void interface_read_file(struct overlay_interface *interface)
{
/* Grab packets, unpackage and dispatch frames to consumers */
struct file_packet packet;
time_ms_t now = gettime_ms();
/* Read from interface file */
long long length=lseek(interface->alarm.poll.fd,0,SEEK_END);
int new_packets = (length - interface->recv_offset) / sizeof packet;
if (new_packets > 20)
WARNF("Getting behind, there are %d unread packets", new_packets);
if (interface->recv_offset<length){
if (lseek(interface->alarm.poll.fd,interface->recv_offset,SEEK_SET) == -1){
WHY_perror("lseek");
return;
}
if (config.debug.overlayinterfaces)
DEBUGF("Read interface %s (size=%lld) at offset=%d",interface->name, length, interface->recv_offset);
ssize_t nread = read(interface->alarm.poll.fd, &packet, sizeof packet);
if (nread == -1){
WHY_perror("read");
return;
}
if (nread == sizeof packet) {
interface->recv_offset += nread;
if (config.debug.packetrx)
DEBUG_packet_visualise("Read from dummy interface", packet.payload, packet.payload_length);
if (((!interface->drop_unicasts) && memcmp(&packet.dst_addr, &interface->address, sizeof(packet.dst_addr))==0) ||
((!interface->drop_broadcasts) &&
memcmp(&packet.dst_addr, &interface->broadcast_address, sizeof(packet.dst_addr))==0)){
if (packetOkOverlay(interface, packet.payload, packet.payload_length, -1,
(struct sockaddr*)&packet.src_addr, sizeof(packet.src_addr))<0) {
WARN("Unsupported packet from dummy interface");
}
}else if (config.debug.packetrx)
DEBUGF("Ignoring packet addressed to %s:%d", inet_ntoa(packet.dst_addr.sin_addr), ntohs(packet.dst_addr.sin_port));
}
}
/* if there's no input, while we want to check for more soon,
we need to allow all other low priority alarms to fire first,
otherwise we'll dominate the scheduler without accomplishing anything */
if (interface->recv_offset>=length){
if (interface->alarm.alarm == -1 || now + 5 < interface->alarm.alarm){
interface->alarm.alarm = now + 5;
interface->alarm.deadline = interface->alarm.alarm + 10000;
}
}else{
/* keep reading new packets as fast as possible,
but don't completely prevent other high priority alarms */
if (interface->alarm.alarm == -1 || now < interface->alarm.alarm){
interface->alarm.alarm = now;
interface->alarm.deadline = interface->alarm.alarm + 10000;
}
}
}
static void interface_read_stream(struct overlay_interface *interface){
unsigned char buffer[OVERLAY_INTERFACE_RX_BUFFER_SIZE];
ssize_t nread = read(interface->alarm.poll.fd, buffer, OVERLAY_INTERFACE_RX_BUFFER_SIZE);
if (nread == -1){
// WHY_perror("read");
return;
}
struct slip_decode_state *state=&interface->slip_decode_state;
state->src=buffer;
state->src_size=nread;
state->src_offset=0;
while (state->src_offset < state->src_size) {
int ret = slip_decode(state);
if (ret==1){
packetOkOverlay(interface, state->dst, state->dst_offset, -1, NULL, -1);
state->dst_offset=0;
}
}
}
static void write_stream_buffer(overlay_interface *interface){
if (interface->tx_bytes_pending>0) {
int written=write(interface->alarm.poll.fd,interface->txbuffer,
interface->tx_bytes_pending);
if (config.debug.packetradio) DEBUGF("Trying to write %d bytes",
interface->tx_bytes_pending);
if (written>0) {
interface->tx_bytes_pending-=written;
bcopy(&interface->txbuffer[written],&interface->txbuffer[0],
interface->tx_bytes_pending);
if (config.debug.packetradio) DEBUGF("Wrote %d bytes (%d left pending)",
written,interface->tx_bytes_pending);
} else {
if (config.debug.packetradio) DEBUGF("Failed to write any data");
}
}
if (interface->tx_bytes_pending>0) {
// more to write, so keep POLLOUT flag
interface->alarm.poll.events|=POLLOUT;
} else {
// nothing more to write, so clear POLLOUT flag
interface->alarm.poll.events&=~POLLOUT;
// try to empty another packet from the queue ASAP
overlay_queue_schedule_next(gettime_ms());
}
watch(&interface->alarm);
}
static void overlay_interface_poll(struct sched_ent *alarm)
{
struct overlay_interface *interface = (overlay_interface *)alarm;
if (alarm->poll.revents==0){
time_ms_t now = gettime_ms();
if (interface->state==INTERFACE_STATE_UP && interface->tick_ms>0){
if (interface->state==INTERFACE_STATE_UP && interface->tick_ms>0 && now >= interface->last_tick_ms+interface->tick_ms){
// tick the interface
time_ms_t now = gettime_ms();
DEBUG("TICK");
overlay_route_queue_advertisements(interface);
alarm->alarm=now+interface->tick_ms;
alarm->deadline=alarm->alarm+interface->tick_ms/2;
schedule(alarm);
interface->last_tick_ms=now;
alarm->alarm=interface->last_tick_ms+interface->tick_ms;
}else{
alarm->alarm=-1;
}
alarm->deadline=alarm->alarm+interface->tick_ms/2;
return;
switch(interface->socket_type){
case SOCK_DGRAM:
case SOCK_STREAM:
break;
case SOCK_FILE:
interface_read_file(interface);
break;
}
if (alarm->alarm!=-1)
schedule(alarm);
}
if (alarm->poll.revents & POLLOUT){
switch(interface->socket_type){
case SOCK_STREAM:
write_stream_buffer(interface);
break;
case SOCK_DGRAM:
case SOCK_FILE:
//XXX error? fatal?
break;
}
}
if (alarm->poll.revents & POLLIN) {
int plen=0;
unsigned char packet[16384];
struct sockaddr src_addr;
socklen_t addrlen = sizeof(src_addr);
/* Read only one UDP packet per call to share resources more fairly, and also
enable stats to accurately count packets received */
int recvttl=1;
plen = recvwithttl(alarm->poll.fd,packet, sizeof(packet), &recvttl, &src_addr, &addrlen);
if (plen == -1) {
WHY_perror("recvwithttl(c)");
overlay_interface_close(interface);
return;
}
/* We have a frame from this interface */
if (config.debug.packetrx)
DEBUG_packet_visualise("Read from real interface", packet,plen);
if (config.debug.overlayinterfaces) {
struct in_addr src = ((struct sockaddr_in *)&src_addr)->sin_addr; // avoid strict-alias warning on Solaris (gcc 4.4)
DEBUGF("Received %d bytes from %s on interface %s",plen,
inet_ntoa(src),
interface->name);
}
if (packetOkOverlay(interface, packet, plen, recvttl, &src_addr, addrlen)) {
WHY("Malformed packet");
// Do we really want to attempt to parse it again?
//DEBUG_packet_visualise("Malformed packet", packet,plen);
switch(interface->socket_type){
case SOCK_DGRAM:
interface_read_dgram(interface);
break;
case SOCK_STREAM:
interface_read_stream(interface);
break;
case SOCK_FILE:
interface_read_file(interface);
break;
}
}
@ -563,133 +697,47 @@ static void overlay_interface_poll(struct sched_ent *alarm)
}
}
struct dummy_packet{
struct sockaddr_in src_addr;
struct sockaddr_in dst_addr;
int pid;
int payload_length;
/* TODO ? ;
half-power beam height (uint16)
half-power beam width (uint16)
range in metres, centre beam (uint32)
latitude (uint32)
longitude (uint32)
X/Z direction (uint16)
Y direction (uint16)
speed in metres per second (uint16)
TX frequency in Hz, uncorrected for doppler (which must be done at the receiving end to take into account
relative motion)
coding method (use for doppler response etc) null terminated string
*/
unsigned char payload[1400];
};
void overlay_dummy_poll(struct sched_ent *alarm)
{
overlay_interface *interface = (overlay_interface *)alarm;
/* Grab packets, unpackage and dispatch frames to consumers */
struct dummy_packet packet;
time_ms_t now = gettime_ms();
/* Read from dummy interface file */
long long length=lseek(alarm->poll.fd,0,SEEK_END);
int new_packets = (length - interface->recv_offset) / sizeof packet;
if (new_packets > 20)
WARNF("Getting behind, there are %d unread packets", new_packets);
if (interface->recv_offset >= length) {
/* if there's no input, while we want to check for more soon,
we need to allow all other low priority alarms to fire first,
otherwise we'll dominate the scheduler without accomplishing anything */
alarm->alarm = gettime_ms() + 5;
if (interface->last_tick_ms != -1 && alarm->alarm > interface->last_tick_ms + interface->tick_ms)
alarm->alarm = interface->last_tick_ms + interface->tick_ms;
alarm->deadline = alarm->alarm + 10000;
} else {
if (lseek(alarm->poll.fd,interface->recv_offset,SEEK_SET) == -1){
WHY_perror("lseek");
return;
}
if (config.debug.overlayinterfaces)
DEBUGF("Read interface %s (size=%lld) at offset=%d",interface->name, length, interface->recv_offset);
ssize_t nread = read(alarm->poll.fd, &packet, sizeof packet);
if (nread == -1){
WHY_perror("read");
return;
}
if (nread == sizeof packet) {
interface->recv_offset += nread;
if (config.debug.packetrx)
DEBUG_packet_visualise("Read from dummy interface", packet.payload, packet.payload_length);
if (((!interface->drop_unicasts) && memcmp(&packet.dst_addr, &interface->address, sizeof(packet.dst_addr))==0) ||
((!interface->drop_broadcasts) &&
memcmp(&packet.dst_addr, &interface->broadcast_address, sizeof(packet.dst_addr))==0)){
if (packetOkOverlay(interface, packet.payload, packet.payload_length, -1,
(struct sockaddr*)&packet.src_addr, sizeof(packet.src_addr))) {
WARN("Unsupported packet from dummy interface");
}
}else if (config.debug.packetrx)
DEBUGF("Ignoring packet addressed to %s:%d", inet_ntoa(packet.dst_addr.sin_addr), ntohs(packet.dst_addr.sin_port));
}
/* keep reading new packets as fast as possible,
but don't completely prevent other high priority alarms */
if (interface->recv_offset >= length)
alarm->alarm = gettime_ms() + 5;
else
alarm->alarm = gettime_ms();
alarm->deadline = alarm->alarm + 100;
}
// only tick the interface if we've caught up reading all the packets
if (interface->recv_offset >= length &&
interface->tick_ms>0 &&
(interface->last_tick_ms == -1 || now >= interface->last_tick_ms + interface->tick_ms)) {
// tick the interface
overlay_route_queue_advertisements(interface);
interface->last_tick_ms=now;
}
schedule(alarm);
return ;
}
int
overlay_broadcast_ensemble(int interface_number,
overlay_broadcast_ensemble(overlay_interface *interface,
struct sockaddr_in *recipientaddr,
unsigned char *bytes,int len)
{
if (config.debug.packettx)
{
DEBUGF("Sending this packet via interface #%d (len=%d)",interface_number,len);
DEBUGF("Sending this packet via interface %s (len=%d)",interface->name,len);
DEBUG_packet_visualise(NULL,bytes,len);
}
overlay_interface *interface = &overlay_interfaces[interface_number];
if (interface->state!=INTERFACE_STATE_UP){
return WHYF("Cannot send to interface %s as it is down", interface->name);
}
if (interface->type==OVERLAY_INTERFACE_PACKETRADIO)
return WHYF("Cannot send aggregate packets to packet radios");
if (interface->fileP)
switch(interface->socket_type){
case SOCK_STREAM:
{
if (interface->tx_bytes_pending>0)
return WHYF("Cannot send two packets to a stream at the same time");
struct dummy_packet packet={
/* Encode packet with SLIP escaping.
XXX - Add error correction here also */
unsigned char *buffer = interface->txbuffer;
int out_len=0;
int encoded = slip_encode(bytes, len, buffer+out_len, sizeof(interface->txbuffer) - out_len);
if (encoded < 0)
return WHY("Buffer overflow");
out_len+=encoded;
interface->tx_bytes_pending=out_len;
write_stream_buffer(interface);
return 0;
}
case SOCK_FILE:
{
struct file_packet packet={
.src_addr = interface->address,
.dst_addr = *recipientaddr,
.pid = getpid(),
@ -702,8 +750,8 @@ overlay_broadcast_ensemble(int interface_number,
packet.payload_length=len;
bcopy(bytes, packet.payload, len);
/* This lseek() is unneccessary because the dummy file is opened in O_APPEND mode. It's
only purpose is to find out the offset to print in the DEBUG statement. It is vulnerable
to a race condition with other processes appending to the same file. */
only purpose is to find out the offset to print in the DEBUG statement. It is vulnerable
to a race condition with other processes appending to the same file. */
off_t fsize = lseek(interface->alarm.poll.fd, (off_t) 0, SEEK_END);
if (fsize == -1)
return WHY_perror("lseek");
@ -716,7 +764,8 @@ overlay_broadcast_ensemble(int interface_number,
return WHYF("only wrote %lld of %lld bytes", nwrite, sizeof(packet));
return 0;
}
else
case SOCK_DGRAM:
{
if (config.debug.overlayinterfaces)
DEBUGF("Sending %d byte overlay frame on %s to %s",len,interface->name,inet_ntoa(recipientaddr->sin_addr));
@ -731,6 +780,10 @@ overlay_broadcast_ensemble(int interface_number,
}
return 0;
}
default:
return WHY("Unsupported socket type");
}
}
/* Register the real interface, or update the existing interface registration. */
@ -752,7 +805,7 @@ overlay_interface_register(char *name,
int i;
for (i = 0; i < config.interfaces.ac; ++i, ifconfig = NULL) {
ifconfig = &config.interfaces.av[i].value;
if ((!ifconfig->dummy[0])&&ifconfig->type!=OVERLAY_INTERFACE_PACKETRADIO) {
if (ifconfig->socket_type==SOCK_DGRAM) {
int j;
for (j = 0; j < ifconfig->match.patc; ++j){
if (fnmatch(ifconfig->match.patv[j], name, 0) == 0)
@ -836,21 +889,24 @@ void overlay_interface_discover(struct sched_ent *alarm)
const struct config_network_interface *ifconfig = NULL;
for (i = 0; i < config.interfaces.ac; ++i, ifconfig = NULL) {
ifconfig = &config.interfaces.av[i].value;
if (ifconfig->type!=OVERLAY_INTERFACE_PACKETRADIO&&(!ifconfig->dummy[0])) {
if (ifconfig->socket_type==SOCK_DGRAM) {
detect_real_interfaces = 1;
continue;
}
int j;
for (j = 0; j < overlay_interface_count; j++)
if (strcasecmp(overlay_interfaces[j].name, ifconfig->dummy) == 0) {
if (overlay_interfaces[j].state==INTERFACE_STATE_DETECTING)
overlay_interfaces[j].state=INTERFACE_STATE_UP;
for (j = 0; j < overlay_interface_count; j++){
if (overlay_interfaces[i].socket_type == ifconfig->socket_type &&
strcasecmp(overlay_interfaces[j].name, ifconfig->file) == 0 &&
overlay_interfaces[j].state==INTERFACE_STATE_DETECTING){
overlay_interfaces[j].state=INTERFACE_STATE_UP;
break;
}
}
if (j >= overlay_interface_count) {
// New dummy interface, so register it.
struct in_addr dummyaddr = hton_in_addr(INADDR_NONE);
overlay_interface_init(ifconfig->dummy, dummyaddr, dummyaddr, dummyaddr, ifconfig);
overlay_interface_init(ifconfig->file, dummyaddr, dummyaddr, dummyaddr, ifconfig);
}
}

View File

@ -175,7 +175,7 @@ int resolve_name(const char *name, struct in_addr *addr){
DEBUGF("Resolved %s into %s", name, inet_ntoa(*addr));
}else
ret=-1;
ret=WHY("Ignoring non IPv4 address");
freeaddrinfo(addresses);
RETURN(ret);
@ -195,7 +195,7 @@ int load_subscriber_address(struct subscriber *subscriber)
if (*hostc->interface){
interface = overlay_interface_find_name(hostc->interface);
if (!interface)
return -1;
return WHY("Can't fund configured interface");
}
struct sockaddr_in addr;
bzero(&addr, sizeof(addr));
@ -251,8 +251,8 @@ int overlay_send_probe(struct subscriber *peer, struct sockaddr_in addr, overlay
if (!interface)
return WHY("I don't know which interface to use");
// never send unicast probes over packet radio
if (interface->type==OVERLAY_INTERFACE_PACKETRADIO)
// never send unicast probes over a stream interface
if (interface->socket_type==SOCK_STREAM)
return 0;
time_ms_t now = gettime_ms();

View File

@ -26,18 +26,24 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
struct sockaddr_in loopback;
unsigned char magic_header[]={0x00, 0x01};
#define PACKET_UNICAST (1<<0)
#define PACKET_INTERFACE (1<<1)
#define PACKET_SEQ (1<<2)
int overlay_packet_init_header(struct decode_context *context, struct overlay_buffer *buff,
int overlay_packet_init_header(int encapsulation,
struct decode_context *context, struct overlay_buffer *buff,
struct subscriber *destination,
char unicast, char interface, char seq){
if (ob_append_bytes(buff,magic_header,sizeof magic_header))
if (encapsulation !=ENCAP_OVERLAY && encapsulation !=ENCAP_SINGLE)
return WHY("Invalid packet encapsulation");
if (ob_append_byte(buff, 0))
return -1;
if (ob_append_byte(buff, encapsulation))
return -1;
if (overlay_address_append(context, buff, my_subscriber))
return -1;
context->sender = my_subscriber;
@ -136,13 +142,14 @@ int overlay_forward_payload(struct overlay_frame *f){
// may return (HEADER_PROCESS|HEADER_FORWARD) || -1
int parseMdpPacketHeader(struct decode_context *context, struct overlay_frame *frame,
struct overlay_buffer *buffer, struct subscriber **nexthop){
IN();
int process=1;
int forward=2;
time_ms_t now = gettime_ms();
int flags = ob_get(buffer);
if (flags<0)
return WHY("Unable to read flags");
RETURN(WHY("Unable to read flags"));
if (flags & PAYLOAD_FLAG_SENDER_SAME){
if (!context->sender)
@ -151,15 +158,18 @@ int parseMdpPacketHeader(struct decode_context *context, struct overlay_frame *f
}else{
int ret=overlay_address_parse(context, buffer, &frame->source);
if (ret<0)
return WHY("Unable to parse payload source");
if (!frame->source || frame->source->reachable==REACHABLE_SELF)
RETURN(WHY("Unable to parse payload source"));
if (!frame->source || frame->source->reachable==REACHABLE_SELF){
process=forward=0;
if (config.debug.overlayframes)
DEBUGF("Ignoring my packet (or unparsable source)");
}
}
if (flags & PAYLOAD_FLAG_TO_BROADCAST){
if (!(flags & PAYLOAD_FLAG_ONE_HOP)){
if (overlay_broadcast_parse(buffer, &frame->broadcast_id))
return WHY("Unable to read broadcast address");
RETURN(WHY("Unable to read broadcast address"));
if (overlay_broadcast_drop_check(&frame->broadcast_id)){
process=forward=0;
if (config.debug.overlayframes)
@ -170,18 +180,24 @@ int parseMdpPacketHeader(struct decode_context *context, struct overlay_frame *f
}else{
int ret=overlay_address_parse(context, buffer, &frame->destination);
if (ret<0)
return WHY("Unable to parse payload destination");
RETURN(WHY("Unable to parse payload destination"));
if (!frame->destination || frame->destination->reachable!=REACHABLE_SELF)
if (!frame->destination || frame->destination->reachable!=REACHABLE_SELF){
process=0;
if (config.debug.overlayframes)
DEBUGF("Don't process packet not addressed to me");
}
if (!(flags & PAYLOAD_FLAG_ONE_HOP)){
ret=overlay_address_parse(context, buffer, nexthop);
if (ret<0)
return WHY("Unable to parse payload nexthop");
RETURN(WHY("Unable to parse payload nexthop"));
if (!(*nexthop) || (*nexthop)->reachable!=REACHABLE_SELF)
if (!(*nexthop) || (*nexthop)->reachable!=REACHABLE_SELF){
forward=0;
if (config.debug.overlayframes)
DEBUGF("Don't forward packet not addressed to me");
}
}
}
@ -190,18 +206,21 @@ int parseMdpPacketHeader(struct decode_context *context, struct overlay_frame *f
}else{
int ttl_qos = ob_get(buffer);
if (ttl_qos<0)
return WHY("Unable to read ttl");
RETURN(WHY("Unable to read ttl"));
frame->ttl = ttl_qos & 0x1F;
frame->queue = (ttl_qos >> 5) & 3;
}
frame->ttl--;
if (frame->ttl<=0)
if (frame->ttl<=0){
forward=0;
if (config.debug.overlayframes)
DEBUGF("Don't forward when TTL expired");
}
if (flags & PAYLOAD_FLAG_LEGACY_TYPE){
frame->type=ob_get(buffer);
if (frame->type<0)
return WHY("Unable to read type");
RETURN(WHY("Unable to read type"));
}else
frame->type=OF_TYPE_DATA;
@ -211,25 +230,21 @@ int parseMdpPacketHeader(struct decode_context *context, struct overlay_frame *f
frame->source->last_rx = now;
// if we can't understand one of the addresses, skip processing the payload
if (context->invalid_addresses){
//if (config.debug.overlayframes)
DEBUG("Skipping payload due to unknown addresses");
return 0;
if ((forward||process)&&context->invalid_addresses){
if (config.debug.overlayframes)
DEBUG("Don't process or forward with invalid addresses");
forward=process=0;
}
return forward|process;
RETURN(forward|process);
}
int parseEnvelopeHeader(struct decode_context *context, struct overlay_interface *interface,
struct sockaddr_in *addr, struct overlay_buffer *buffer){
IN();
time_ms_t now = gettime_ms();
if (ob_get(buffer)!=magic_header[0] || ob_get(buffer)!=magic_header[1])
return WHY("Packet type not recognised.");
if (overlay_address_parse(context, buffer, &context->sender))
return WHY("Unable to parse sender");
RETURN(WHY("Unable to parse sender"));
int packet_flags = ob_get(buffer);
@ -242,8 +257,11 @@ int parseEnvelopeHeader(struct decode_context *context, struct overlay_interface
if (context->sender){
// ignore packets that have been reflected back to me
if (context->sender->reachable==REACHABLE_SELF)
return 1;
if (context->sender->reachable==REACHABLE_SELF){
if (config.debug.overlayframes)
DEBUG("Completely ignore packets I sent");
RETURN(1);
}
context->sender->last_rx = now;
@ -288,7 +306,7 @@ int parseEnvelopeHeader(struct decode_context *context, struct overlay_interface
else
context->addr=interface->broadcast_address;
}
return 0;
RETURN(0);
}
int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, size_t len,
@ -367,42 +385,59 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
if (config.debug.overlayframes)
DEBUG("Received overlay packet");
if (parseEnvelopeHeader(&context, interface, (struct sockaddr_in *)recvaddr, b)<0){
if (ob_get(b)!=0)
RETURN(WHY("Packet type not recognised."));
int encapsulation = ob_get(b);
if (encapsulation !=ENCAP_OVERLAY && encapsulation !=ENCAP_SINGLE)
RETURN(WHY("Invalid packet encapsulation"));
int ret=parseEnvelopeHeader(&context, interface, (struct sockaddr_in *)recvaddr, b);
if (ret){
ob_free(b);
RETURN(-1);
RETURN(ret);
}
while(b->position < b->sizeLimit){
while(ob_remaining(b)>0){
context.invalid_addresses=0;
struct subscriber *nexthop=NULL;
bzero(f.broadcast_id.id, BROADCAST_LEN);
int ret = parseMdpPacketHeader(&context, &f, b, &nexthop);
if (ret<0){
WHY("Header is too short");
int header_valid = parseMdpPacketHeader(&context, &f, b, &nexthop);
if (header_valid<0){
ret = WHY("Header is too short");
break;
}
// TODO allow for one byte length
unsigned int payload_len = ob_get_ui16(b);
if (payload_len > ob_remaining(b)){
WHYF("Invalid payload length (%d)", payload_len);
break;
}
unsigned int payload_len;
switch (encapsulation){
case ENCAP_SINGLE:
payload_len = ob_remaining(b);
break;
case ENCAP_OVERLAY:
payload_len = ob_get_ui16(b);
if (payload_len > ob_remaining(b)){
ret = WHYF("Invalid payload length (%d)", payload_len);
goto end;
}
break;
}
int next_payload = ob_position(b) + payload_len;
if (ret!=0){
if (config.debug.overlayframes){
DEBUGF("Received payload type %x, len %d", f.type, next_payload - b->position);
DEBUGF("Payload from %s", alloca_tohex_sid(f.source->sid));
DEBUGF("Payload to %s", (f.destination?alloca_tohex_sid(f.destination->sid):"broadcast"));
if (!is_all_matching(f.broadcast_id.id, BROADCAST_LEN, 0))
DEBUGF("Broadcast id %s", alloca_tohex(f.broadcast_id.id, BROADCAST_LEN));
if (nexthop)
DEBUGF("Next hop %s", alloca_tohex_sid(nexthop->sid));
}
if (config.debug.overlayframes){
DEBUGF("Received payload type %x, len %d", f.type, payload_len);
DEBUGF("Payload from %s", f.source?alloca_tohex_sid(f.source->sid):"NULL");
DEBUGF("Payload to %s", (f.destination?alloca_tohex_sid(f.destination->sid):"broadcast"));
if (!is_all_matching(f.broadcast_id.id, BROADCAST_LEN, 0))
DEBUGF("Broadcast id %s", alloca_tohex(f.broadcast_id.id, BROADCAST_LEN));
if (nexthop)
DEBUGF("Next hop %s", alloca_tohex_sid(nexthop->sid));
}
if (header_valid!=0){
f.payload = ob_slice(b, b->position, payload_len);
if (!f.payload){
@ -414,11 +449,11 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
ob_limitsize(f.payload, payload_len);
// forward payloads that are for someone else or everyone
if (ret&HEADER_FORWARD)
if (header_valid&HEADER_FORWARD)
overlay_forward_payload(&f);
// process payloads that are for me or everyone
if (ret&HEADER_PROCESS)
if (header_valid&HEADER_PROCESS)
process_incoming_frame(now, interface, &f, &context);
}
@ -430,9 +465,10 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
b->position=next_payload;
}
end:
send_please_explain(&context, my_subscriber, context.sender);
ob_free(b);
RETURN(0);
RETURN(ret);
}

View File

@ -1,20 +1,7 @@
#include "serval.h"
#include "conf.h"
#include "overlay_address.h"
#include "overlay_buffer.h"
#include "overlay_packet.h"
#include <termios.h>
/* interface decoder states. broadly based on RFC1055 */
#define DC_NORMAL 0
#define DC_ESC 1
/* SLIP-style escape characters used for serial packet radio interfaces */
#define SLIP_END 0xc0
#define SLIP_ESC 0xdb
#define SLIP_ESC_END 0xdc
#define SLIP_ESC_ESC 0xdd
int overlay_packetradio_setup_port(overlay_interface *interface)
{
struct termios t;
@ -44,288 +31,11 @@ int overlay_packetradio_setup_port(overlay_interface *interface)
tcsetattr(interface->alarm.poll.fd, TCSANOW, &t);
write(interface->alarm.poll.fd,"ATO\r",4);
if (config.debug.packetradio)
DEBUGF("Sent ATO to make sure we are in on-line mode");
set_nonblock(interface->alarm.poll.fd);
return 0;
}
int overlay_rx_packet_complete(overlay_interface *interface)
{
if (interface->recv_offset) {
struct decode_context context;
struct overlay_buffer *buffer=ob_static(interface->rxbuffer, interface->recv_offset);
ob_limitsize(buffer, interface->recv_offset);
struct overlay_frame frame;
struct subscriber *next_hop=NULL;
bzero(&context, sizeof(struct decode_context));
bzero(&frame, sizeof(struct overlay_frame));
if (parseEnvelopeHeader(&context, interface, NULL, buffer))
goto end;
int packetFlags = parseMdpPacketHeader(&context, &frame, buffer, &next_hop);
if (packetFlags<=0)
goto end;
frame.payload = ob_slice(buffer, ob_position(buffer), ob_remaining(buffer));
ob_limitsize(frame.payload, ob_remaining(buffer));
// forward payloads that are for someone else or everyone
if (packetFlags&HEADER_FORWARD)
overlay_forward_payload(&frame);
// process payloads that are for me or everyone
if (packetFlags&HEADER_PROCESS)
process_incoming_frame(gettime_ms(), interface, &frame, &context);
ob_free(frame.payload);
end:
send_please_explain(&context, my_subscriber, context.sender);
ob_free(buffer);
}
interface->recv_offset=0;
return 0;
}
int overlay_rx_packet_append_byte(overlay_interface *interface,unsigned char byte)
{
// Make sure we don't put the data outside the RX buffer
if (interface->recv_offset<0
||interface->recv_offset>=OVERLAY_INTERFACE_RX_BUFFER_SIZE)
interface->recv_offset=0;
interface->rxbuffer[interface->recv_offset++]=byte;
if (interface->recv_offset==OVERLAY_INTERFACE_RX_BUFFER_SIZE) {
// packet fills buffer. Who knows, we might be able to decode what we
// have of it.
return overlay_rx_packet_complete(interface);
}
return 0;
}
static void write_buffer(overlay_interface *interface){
if (interface->tx_bytes_pending>0) {
int written=write(interface->alarm.poll.fd,interface->txbuffer,
interface->tx_bytes_pending);
if (config.debug.packetradio) DEBUGF("Trying to write %d bytes",
interface->tx_bytes_pending);
if (written>0) {
interface->tx_bytes_pending-=written;
bcopy(&interface->txbuffer[written],&interface->txbuffer[0],
interface->tx_bytes_pending);
if (config.debug.packetradio) DEBUGF("Wrote %d bytes (%d left pending)",
written,interface->tx_bytes_pending);
} else {
if (config.debug.packetradio) DEBUGF("Failed to write any data");
}
}
if (interface->tx_bytes_pending>0) {
// more to write, so keep POLLOUT flag
interface->alarm.poll.events|=POLLOUT;
} else {
// nothing more to write, so clear POLLOUT flag
interface->alarm.poll.events&=~POLLOUT;
// try to empty another packet from the queue ASAP
overlay_queue_schedule_next(gettime_ms());
}
watch(&interface->alarm);
}
void overlay_packetradio_poll(struct sched_ent *alarm)
{
overlay_interface *interface = (overlay_interface *)alarm;
time_ms_t now = gettime_ms();
if (alarm->poll.revents==0){
if (interface->state==INTERFACE_STATE_UP &&
(interface->last_tick_ms==-1 || interface->last_tick_ms + interface->tick_ms<now)){
// tick the interface
overlay_route_queue_advertisements(interface);
interface->last_tick_ms=now;
}
alarm->alarm=interface->last_tick_ms + interface->tick_ms;
alarm->deadline=alarm->alarm + interface->tick_ms/2;
unschedule(alarm);
schedule(alarm);
return;
}
if (alarm->poll.revents&POLLOUT){
write_buffer(interface);
}
// Read data from the serial port
if (alarm->poll.revents&POLLIN){
unsigned char buffer[OVERLAY_INTERFACE_RX_BUFFER_SIZE];
ssize_t nread = read(alarm->poll.fd, buffer, OVERLAY_INTERFACE_RX_BUFFER_SIZE);
if (nread == -1){
// WHY_perror("read");
return;
}
if (nread>0) {
/*
Examine received bytes for end of packet marker.
The challenge is that we need to make sure that the packet encapsulation
is self-synchronising in the event that a data error occurs (including
failure to receive an arbitrary number of bytes).
For now we will reuse the functional but sub-optimal method described in
RFC1055 for SLIP.
*/
int i;
for(i=0;i<nread;i++)
{
switch (interface->decoder_state) {
case DC_ESC:
// escaped character
interface->decoder_state=DC_NORMAL;
switch(buffer[i]) {
case SLIP_ESC_END: // escaped END byte
overlay_rx_packet_append_byte(interface,SLIP_END);
break;
case SLIP_ESC_ESC: // escaped escape character
overlay_rx_packet_append_byte(interface,SLIP_ESC);
break;
default: /* Unknown escape character. This is an error. */
if (config.debug.packetradio)
WARNF("Packet radio stream contained illegal escaped byte 0x%02x -- ignoring packet.",buffer[i]);
// interface->recv_offset=0;
break;
}
break;
default:
// non-escape character
switch(buffer[i]) {
case SLIP_ESC:
interface->decoder_state=DC_ESC;
break;
case SLIP_END:
overlay_rx_packet_complete(interface);
break;
default:
overlay_rx_packet_append_byte(interface,buffer[i]);
}
break;
}
}
}
}
return ;
}
static int encode(unsigned char *src, int src_bytes, unsigned char *dst, int dst_len){
int offset=0;
int i;
for (i=0;i<src_bytes;i++){
if (offset+2>dst_len)
return WHY("Buffer overflow while encoding frame");
switch(src[i]) {
case SLIP_END:
dst[offset++]=SLIP_ESC;
dst[offset++]=SLIP_ESC_END;
break;
case SLIP_ESC:
dst[offset++]=SLIP_ESC;
dst[offset++]=SLIP_ESC_ESC;
break;
default:
dst[offset++]=src[i];
}
}
return offset;
}
int overlay_packetradio_tx_packet(struct overlay_frame *frame)
{
/*
This is a bit interesting, because we have to deal with RTS/CTS potentially
blocking our writing of the packet.
For now, just try to write it, and if we only write part of it, then so be it.
We will surround each packet with SLIP END characters, so we should be able to
deal with such truncation in a fairly sane manner.
*/
overlay_interface *interface=frame->interface;
int interface_number = interface - overlay_interfaces;
if (frame->payload->position>OVERLAY_INTERFACE_RX_BUFFER_SIZE)
return WHYF("Not sending over-size packet");
if (interface->tx_bytes_pending>0)
return WHYF("Cannot send two packets at the same time");
struct overlay_buffer *headers=ob_new();
if (!headers)
return WHY("could not allocate overlay buffer for headers");
struct decode_context context;
bzero(&context, sizeof(struct decode_context));
if (frame->source_full)
my_subscriber->send_full=1;
if (overlay_packet_init_header(&context, headers, NULL, 0, interface_number, 0))
goto cleanup;
struct broadcast *broadcast=NULL;
if ((!frame->destination) && !is_all_matching(frame->broadcast_id.id,BROADCAST_LEN,0))
broadcast = &frame->broadcast_id;
if (overlay_frame_build_header(&context, headers,
frame->queue, frame->type,
frame->modifiers, frame->ttl,
broadcast, frame->next_hop,
frame->destination, frame->source))
goto cleanup;
/* Encode packet with SLIP escaping.
XXX - Add error correction here also */
unsigned char *buffer = interface->txbuffer;
int out_len=0;
buffer[out_len++]=SLIP_END;
int encoded=encode(headers->bytes, headers->position,
buffer+out_len, sizeof(interface->txbuffer) - out_len);
if (encoded<0){
WHY("Ran out of buffer space while encoding headers");
goto cleanup;
}
out_len+=encoded;
encoded=encode(frame->payload->bytes, frame->payload->position,
buffer+out_len, sizeof(interface->txbuffer) - out_len);
if (encoded<0){
WHY("Ran out of buffer space while encoding payload body");
goto cleanup;
}
out_len+=encoded;
buffer[out_len++]=SLIP_END;
if (config.debug.packetradio){
DEBUGF("Encoded length is %d",out_len);
}
interface->tx_bytes_pending=out_len;
write_buffer(interface);
ob_free(headers);
return 0;
cleanup:
ob_free(headers);
return -1;
}

View File

@ -104,7 +104,7 @@ int overlay_frame_append_payload(struct decode_context *context, overlay_interfa
if (ob_append_ui16(b, ob_position(p->payload)))
goto cleanup;
if (ob_append_bytes(b,p->payload->bytes,p->payload->position)) {
if (ob_append_bytes(b, ob_ptr(p->payload), ob_position(p->payload))) {
WHY("could not append payload");
goto cleanup;
}
@ -115,7 +115,36 @@ cleanup:
ob_rewind(b);
return -1;
}
int single_packet_encapsulation(struct overlay_buffer *b, struct overlay_frame *frame){
overlay_interface *interface=frame->interface;
int interface_number = interface - overlay_interfaces;
struct decode_context context;
bzero(&context, sizeof(struct decode_context));
if (frame->source_full)
my_subscriber->send_full=1;
if (overlay_packet_init_header(ENCAP_SINGLE, &context, b, NULL, 0, interface_number, 0))
return -1;
struct broadcast *broadcast=NULL;
if ((!frame->destination) && !is_all_matching(frame->broadcast_id.id,BROADCAST_LEN,0))
broadcast = &frame->broadcast_id;
if (overlay_frame_build_header(&context, b,
frame->queue, frame->type,
frame->modifiers, frame->ttl,
broadcast, frame->next_hop,
frame->destination, frame->source))
return -1;
if (ob_append_buffer(b, frame->payload))
return -1;
return 0;
}
int op_free(struct overlay_frame *p)
{
if (!p) return WHY("Asked to free NULL");

View File

@ -243,7 +243,8 @@ overlay_init_packet(struct outgoing_packet *packet, struct subscriber *destinati
packet->unicast_subscriber = destination;
ob_limitsize(packet->buffer, packet->interface->mtu);
overlay_packet_init_header(&packet->context, packet->buffer, destination, unicast, packet->i, 0);
overlay_packet_init_header(ENCAP_OVERLAY, &packet->context, packet->buffer,
destination, unicast, packet->i, 0);
packet->header_length = ob_position(packet->buffer);
}
@ -417,24 +418,33 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
}
if (!packet->buffer){
// TODO, interface flag for this behaviour?
if (frame->interface->type==OVERLAY_INTERFACE_PACKETRADIO){
// don't transmit if the serial tx buffer has data
if (frame->interface->socket_type==SOCK_STREAM){
// skip this interface if the stream tx buffer has data
if (frame->interface->tx_bytes_pending>0)
goto skip;
// send packets without aggregating them together
if (overlay_packetradio_tx_packet(frame))
goto skip;
goto sent;
}
// can we send a packet on this interface now?
if (limit_is_allowed(&frame->interface->transfer_limit))
goto skip;
if (frame->interface->encapsulation==ENCAP_SINGLE){
// send MDP packets without aggregating them together
struct overlay_buffer *buff = ob_new();
int ret=single_packet_encapsulation(buff, frame);
if (!ret){
ret=overlay_broadcast_ensemble(frame->interface, &frame->recvaddr, ob_ptr(buff), ob_position(buff));
}
ob_free(buff);
if (ret)
goto skip;
goto sent;
}
if (frame->source_full)
my_subscriber->send_full=1;
overlay_init_packet(packet, frame->next_hop, frame->unicast, frame->interface, frame->recvaddr);
@ -518,7 +528,7 @@ overlay_fill_send_packet(struct outgoing_packet *packet, time_ms_t now) {
if (config.debug.packetconstruction)
ob_dump(packet->buffer,"assembled packet");
if (overlay_broadcast_ensemble(packet->i, &packet->dest, ob_ptr(packet->buffer), ob_position(packet->buffer))){
if (overlay_broadcast_ensemble(packet->interface, &packet->dest, ob_ptr(packet->buffer), ob_position(packet->buffer))){
// sendto failed. We probably don't have a valid route
if (packet->unicast_subscriber){
set_reachable(packet->unicast_subscriber, REACHABLE_NONE);

View File

@ -734,6 +734,8 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct
/* Prepare for fetching via HTTP */
slot->peer_ipandport = *peerip;
slot->alarm.poll.fd=-1;
strbuf r = strbuf_local(slot->request, sizeof slot->request);
strbuf_sprintf(r, "GET /rhizome/file/%s HTTP/1.0\r\n\r\n", m->fileHexHash);
if (strbuf_overrun(r))

View File

@ -358,20 +358,38 @@ extern int overlayMode;
// larger.
#define OVERLAY_INTERFACE_TX_BUFFER_SIZE (2+2048*2)
struct slip_decode_state{
int state;
unsigned char *src;
int src_size;
unsigned char dst[OVERLAY_INTERFACE_RX_BUFFER_SIZE];
int src_offset;
int dst_offset;
};
typedef struct overlay_interface {
struct sched_ent alarm;
char name[256];
unsigned char rxbuffer[OVERLAY_INTERFACE_RX_BUFFER_SIZE];
int recv_offset; /* either dummy file offset or number of bytes in RX buffer
for packet radio interfaces */
int recv_offset; /* file offset */
unsigned char txbuffer[OVERLAY_INTERFACE_RX_BUFFER_SIZE];
int tx_bytes_pending;
char decoder_state; // decoder state for packet radio interfaces
int fileP; // non-zero for dummy and packet radio serial interfaces
struct slip_decode_state slip_decode_state;
// copy of ifconfig flags
char drop_broadcasts;
char drop_unicasts;
int port;
int type;
int socket_type;
int encapsulation;
char send_broadcasts;
char prefer_unicast;
// can we use this interface for routes to addresses in other subnets?
int default_route;
/* Number of milli-seconds per tick for this interface, which is basically related to the
the typical TX range divided by the maximum expected speed of nodes in the network.
This means that short-range communications has a higher bandwidth requirement than
@ -384,10 +402,8 @@ typedef struct overlay_interface {
These figures will be refined over time, and we will allow people to set them per-interface.
*/
unsigned tick_ms; /* milliseconds per tick */
struct subscriber *next_advert;
char send_broadcasts;
char prefer_unicast;
struct subscriber *next_advert;
/* The time of the last tick on this interface in milli seconds */
time_ms_t last_tick_ms;
@ -407,8 +423,6 @@ typedef struct overlay_interface {
struct sockaddr_in address;
struct sockaddr_in broadcast_address;
struct in_addr netmask;
// can we use this interface for routes to addresses in other subnets?
int default_route;
/* Not necessarily the real MTU, but the largest frame size we are willing to TX on this interface.
For radio links the actual maximum and the maximum that is likely to be delivered reliably are
@ -483,7 +497,9 @@ time_ms_t overlay_time_until_next_tick();
int overlay_frame_append_payload(struct decode_context *context, overlay_interface *interface,
struct overlay_frame *p, struct overlay_buffer *b);
int overlay_packet_init_header(struct decode_context *context, struct overlay_buffer *buff,
int single_packet_encapsulation(struct overlay_buffer *b, struct overlay_frame *frame);
int overlay_packet_init_header(int encapsulation,
struct decode_context *context, struct overlay_buffer *buff,
struct subscriber *destination,
char unicast, char interface, char seq);
int overlay_frame_build_header(struct decode_context *context, struct overlay_buffer *buff,
@ -674,9 +690,10 @@ int overlay_interface_register(char *name,
overlay_interface * overlay_interface_get_default();
overlay_interface * overlay_interface_find(struct in_addr addr, int return_default);
overlay_interface * overlay_interface_find_name(const char *name);
int overlay_broadcast_ensemble(int interface_number,
int
overlay_broadcast_ensemble(overlay_interface *interface,
struct sockaddr_in *recipientaddr,
unsigned char *bytes,int len);
unsigned char *bytes,int len);
int directory_registration();
int directory_service_init();
@ -821,4 +838,7 @@ uint64_t read_uint64(unsigned char *o);
uint32_t read_uint32(unsigned char *o);
uint16_t read_uint16(unsigned char *o);
int slip_encode(unsigned char *src, int src_bytes, unsigned char *dst, int dst_len);
int slip_decode(struct slip_decode_state *state);
#endif // __SERVALD_SERVALD_H

View File

@ -56,6 +56,7 @@ SERVAL_SOURCES = $(SERVAL_BASE)audiodevices.c \
$(SERVAL_BASE)server.c \
$(SERVAL_BASE)sha2.c \
$(SERVAL_BASE)sighandlers.c \
$(SERVAL_BASE)slip.c \
$(SERVAL_BASE)sqlite-amalgamation-3070900/sqlite3.c \
$(SERVAL_BASE)srandomdev.c \
$(SERVAL_BASE)str.c \

View File

@ -616,7 +616,7 @@ configure_servald_server() {
}
# Utility function:
# - start a set of servald server processes running on a shared dummy interface
# - start a set of servald server processes running on a shared file interface
# and with its own private monitor and MDP abstract socket names
# - set variables DUMMYx to the full path name of shared dummy interface
# - set variables LOGx to the full path of server log file for instance x: LOGA,
@ -638,9 +638,7 @@ start_servald_instances() {
# These config settings can be overridden in a caller-supplied configure_servald_server().
# They are extremely useful for the majority of fixtures.
executeOk_servald config \
set interfaces.1.dummy "$DUMMYNET" \
set interfaces.1.dummy_address 127.0.0.1 \
set interfaces.1.dummy_netmask 255.255.255.0 \
set interfaces.1.file "$DUMMYNET" \
set monitor.socket "org.servalproject.servald.monitor.socket.$TFWUNIQUE.$instance_name" \
set mdp.socket "org.servalproject.servald.mdp.socket.$TFWUNIQUE.$instance_name"
configure_servald_server

View File

@ -244,20 +244,20 @@ test_InterfacesModernMatch() {
executeOk_servald config set interfaces.0.match 'eth*, wifi*'
}
doc_InterfacesModernDummy="Modern 'interfaces.N.dummy' config option"
doc_InterfacesModernDummy="Modern 'interfaces.N.file' config option"
test_InterfacesModernDummy() {
executeOk_servald config set interfaces.0.dummy dummyname
executeOk_servald config set interfaces.0.file dummyname
}
teardown_InterfacesModernDummy() {
tfw_cat $SERVALINSTANCE_PATH/serval.conf
teardown
}
doc_InterfacesModernIncompatible="Config options 'interfaces.match' and 'interfaces.dummy' are incompatible"
doc_InterfacesModernIncompatible="Config options 'interfaces.match' and 'interfaces.file' are incompatible"
test_InterfacesModernIncompatible() {
executeOk_servald config set interfaces.0.match eth
execute --stderr --core-backtrace --executable=$servald \
config set interfaces.0.dummy dummy
config set interfaces.0.file dummy
assertExitStatus '==' 2
assert_stderr_log \
--warn-pattern='"interfaces\.0\..*incompatible' \

View File

@ -88,13 +88,13 @@ test_publish() {
}
interface_up() {
grep "Dummy interface .* is up" $instance_servald_log || return 1
grep "Interface .* is up" $instance_servald_log || return 1
return 0
}
start_routing_instance() {
executeOk_servald config \
set server.dummy_interface_dir "$SERVALD_VAR" \
set server.interface_path "$SERVALD_VAR" \
set monitor.socket "org.servalproject.servald.monitor.socket.$TFWUNIQUE.$instance_name" \
set mdp.socket "org.servalproject.servald.mdp.socket.$TFWUNIQUE.$instance_name" \
set rhizome.enable No \
@ -112,7 +112,7 @@ setup_routing() {
>$SERVALD_VAR/dummy1
foreach_instance +A +B +C \
executeOk_servald config \
set interfaces.0.dummy dummy1 \
set interfaces.0.file dummy1 \
set interfaces.0.mdp_tick_ms 0 \
set interfaces.0.default_route 1 \
set interfaces.0.dummy_netmask 255.255.255.0

View File

@ -24,19 +24,19 @@ source "${0%/*}/../testdefs.sh"
add_interface() {
>$SERVALD_VAR/dummy$1
executeOk_servald config \
set interfaces.$1.dummy dummy$1 \
set interfaces.$1.file dummy$1 \
set interfaces.$1.dummy_address 127.0.$1.$instance_number \
set interfaces.$1.dummy_netmask 255.255.255.224
}
interface_up() {
grep "Dummy interface .* is up" $instance_servald_log || return 1
grep "Interface .* is up" $instance_servald_log || return 1
return 0
}
start_routing_instance() {
executeOk_servald config \
set server.dummy_interface_dir "$SERVALD_VAR" \
set server.interface_path "$SERVALD_VAR" \
set monitor.socket "org.servalproject.servald.monitor.socket.$TFWUNIQUE.$instance_name" \
set mdp.socket "org.servalproject.servald.mdp.socket.$TFWUNIQUE.$instance_name" \
set log.show_pid on \
@ -78,6 +78,44 @@ test_single_link() {
assertStdoutGrep --matches=1 "^$SIDB:BROADCAST UNICAST :"
}
doc_single_mdp="Use single MDP per packet encapsulation"
setup_single_mdp() {
setup_servald
assert_no_servald_processes
foreach_instance +A +B create_single_identity
foreach_instance +A +B add_interface 1
foreach_instance +A +B executeOk_servald config set interfaces.1.encapsulation single
foreach_instance +A +B start_routing_instance
}
test_single_mdp() {
foreach_instance +A +B \
wait_until has_seen_instances +A +B
set_instance +A
executeOk_servald mdp ping $SIDB 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDB:BROADCAST UNICAST :"
}
doc_mismatched_encap="Mismatched MDP packet encapsulation"
setup_mismatched_encap() {
setup_servald
assert_no_servald_processes
foreach_instance +A +B create_single_identity
foreach_instance +A +B add_interface 1
foreach_instance +A executeOk_servald config set interfaces.1.encapsulation single
foreach_instance +A +B start_routing_instance
}
test_mismatched_encap() {
foreach_instance +A +B \
wait_until has_seen_instances +A +B
set_instance +A
executeOk_servald mdp ping $SIDB 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDB:BROADCAST UNICAST :"
}
doc_multiple_nodes="Multiple nodes on one link"
setup_multiple_nodes() {
setup_servald
@ -113,7 +151,7 @@ setup_scan() {
set interfaces.1.dummy_address 127.0.1.11
foreach_instance +A +B \
executeOk_servald config \
set interfaces.1.dummy_filter_broadcasts 1
set interfaces.1.drop_broadcasts 1
foreach_instance +A +B start_routing_instance
}
test_scan() {
@ -138,8 +176,8 @@ setup_broadcast_only() {
assert_no_servald_processes
foreach_instance +A +B create_single_identity
foreach_instance +A +B add_interface 1
foreach_instance +A +B \
executeOk_servald config set interfaces.1.dummy_filter_unicasts 1
foreach_instance +A +B \
executeOk_servald config set interfaces.1.drop_unicasts 1
foreach_instance +A +B start_routing_instance
}
test_broadcast_only() {