Allow for interfaces that don't tick, pack rhizome manifests into all packets

This commit is contained in:
Jeremy Lakeman 2012-08-30 09:34:52 +09:30
parent 1f6607af14
commit 1b91724da2
3 changed files with 77 additions and 35 deletions

View File

@ -252,6 +252,19 @@ error:
return -1;
}
overlay_interface * overlay_interface_find(struct in_addr addr){
int i;
for (i=0;i<OVERLAY_MAX_INTERFACES;i++){
if (overlay_interfaces[i].state!=INTERFACE_STATE_UP)
continue;
if ((overlay_interfaces[i].netmask.s_addr & addr.s_addr) == (overlay_interfaces[i].netmask.s_addr & overlay_interfaces[i].address.sin_addr.s_addr)){
return &overlay_interfaces[i];
}
}
return NULL;
}
// OSX doesn't recieve broadcast packets on sockets bound to an interface's address
// So we have to bind a socket to INADDR_ANY to receive these packets.
static void
@ -259,7 +272,6 @@ overlay_interface_read_any(struct sched_ent *alarm){
if (alarm->poll.revents & POLLIN) {
int plen=0;
int recvttl=1;
int i;
unsigned char packet[16384];
overlay_interface *interface=NULL;
struct sockaddr src_addr;
@ -278,17 +290,9 @@ overlay_interface_read_any(struct sched_ent *alarm){
struct in_addr src = ((struct sockaddr_in *)&src_addr)->sin_addr;
/* Try to identify the real interface that the packet arrived on */
for (i=0;i<OVERLAY_MAX_INTERFACES;i++){
if (overlay_interfaces[i].state!=INTERFACE_STATE_UP)
continue;
// TODO test netmask...
if ((overlay_interfaces[i].netmask.s_addr & src.s_addr) == (overlay_interfaces[i].netmask.s_addr & overlay_interfaces[i].address.sin_addr.s_addr)){
interface = &overlay_interfaces[i];
break;
}
}
interface = overlay_interface_find(src);
/* Should we drop the packet if we don't find a match? */
/* Drop the packet if we don't find a match */
if (!interface){
if (debug&DEBUG_OVERLAYINTERFACES)
DEBUGF("Could not find matching interface for packet received from %s", inet_ntoa(src));
@ -385,10 +389,12 @@ overlay_interface_init_socket(int interface_index)
interface->alarm.stats=&interface_poll_stats;
watch(&interface->alarm);
// run the first tick asap
interface->alarm.alarm=gettime_ms();
interface->alarm.deadline=interface->alarm.alarm+10;
schedule(&interface->alarm);
if (interface->tick_ms>0){
// run the first tick asap
interface->alarm.alarm=gettime_ms();
interface->alarm.deadline=interface->alarm.alarm+10;
schedule(&interface->alarm);
}
interface->state=INTERFACE_STATE_UP;
@ -417,6 +423,8 @@ overlay_interface_init(char *name, struct in_addr src_addr, struct in_addr netma
interface->type=type;
interface->last_tick_ms= -1; // not ticked yet
interface->alarm.poll.fd=0;
// how often do we announce ourselves on this interface?
switch (type) {
case OVERLAY_INTERFACE_PACKETRADIO:
interface->tick_ms = confValueGetInt64Range("mdp.packetradio.tick_ms", 15000LL, 1LL, 3600000LL);
@ -434,6 +442,19 @@ overlay_interface_init(char *name, struct in_addr src_addr, struct in_addr netma
return WHYF("Unsupported interface type %d", type);
}
// allow for a per interface override
{
char option_name[64];
snprintf(option_name, sizeof(option_name), "mdp.%s.tick_ms", name);
interface->tick_ms = confValueGetInt64Range(option_name, interface->tick_ms, 1LL, 3600000LL);
}
// disable announcements and other broadcasts if tick_ms=0.
if (interface->tick_ms>0)
interface->send_broadcasts=1;
else
interface->send_broadcasts=0;
if (name[0]=='>') {
interface->fileP=1;
char dummyfile[1024];
@ -491,7 +512,7 @@ static void overlay_interface_poll(struct sched_ent *alarm)
if (alarm->poll.revents==0){
if (interface->state==INTERFACE_STATE_UP){
if (interface->state==INTERFACE_STATE_UP && interface->tick_ms>0){
// tick the interface
time_ms_t now = gettime_ms();
int i = (interface - overlay_interfaces);
@ -552,7 +573,7 @@ void overlay_dummy_poll(struct sched_ent *alarm)
unsigned char transaction_id[8];
time_ms_t now = gettime_ms();
if (interface->last_tick_ms == -1 || now >= interface->last_tick_ms + interface->tick_ms) {
if (interface->tick_ms>0 && (interface->last_tick_ms == -1 || now >= interface->last_tick_ms + interface->tick_ms)) {
// tick the interface
int i = (interface - overlay_interfaces);
overlay_tick_interface(i, now);
@ -698,7 +719,11 @@ overlay_broadcast_ensemble(int interface_number,
else
{
if(sendto(interface->alarm.poll.fd,
<<<<<<< HEAD
bytes, len, 0, (struct sockaddr *)&s, sizeof(struct sockaddr_in)) != len){
=======
bytes, len, 0, (struct sockaddr *)recipientaddr, sizeof(struct sockaddr_in)) != len){
>>>>>>> 7e557b4... Don't send any broadcasts if tick_ms=0
WHY_perror("sendto(c)");
overlay_interface_close(interface);
return -1;
@ -1020,11 +1045,11 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
int i;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
{
if (overlay_interfaces[i].state==INTERFACE_STATE_UP)
if (!frame->broadcast_sent_via[i]){
overlay_init_packet(packet, &overlay_interfaces[i], overlay_interfaces[i].broadcast_address);
break;
}
if (overlay_interfaces[i].state==INTERFACE_STATE_UP
&& !frame->broadcast_sent_via[i]){
overlay_init_packet(packet, &overlay_interfaces[i], overlay_interfaces[i].broadcast_address);
break;
}
}
if (!packet->buffer){
@ -1109,13 +1134,18 @@ overlay_fill_send_packet(struct outgoing_packet *packet, time_ms_t now) {
if(packet->buffer){
// send the packet
if (packet->buffer->position>=HEADERFIELDS_LEN){
// stuff rhizome announcements at the last moment
if (rhizome_enabled() && rhizome_http_server_running()){
overlay_rhizome_add_advertisements(packet->i,packet->buffer);
}
if (debug&DEBUG_PACKETCONSTRUCTION)
dump("assembled packet",&packet->buffer->bytes[0],packet->buffer->position);
if (debug&DEBUG_OVERLAYINTERFACES)
DEBUGF("Sending %d byte packet",packet->buffer->position);
overlay_broadcast_ensemble(packet->i,NULL,packet->buffer->bytes,packet->buffer->position);
overlay_broadcast_ensemble(packet->i, &packet->dest, packet->buffer->bytes, packet->buffer->position);
}
ob_free(packet->buffer);
overlay_address_clear();
@ -1166,9 +1196,6 @@ overlay_tick_interface(int i, time_ms_t now) {
/* Add advertisements for ROUTES */
overlay_route_add_advertisements(packet.buffer);
if (rhizome_enabled() && rhizome_http_server_running())
overlay_rhizome_add_advertisements(i,packet.buffer);
/* Stuff more payloads from queues and send it */
overlay_fill_send_packet(&packet, now);
RETURN(0);

View File

@ -193,7 +193,8 @@ int overlay_payload_enqueue(int q, struct overlay_frame *p)
alloca_tohex(p->destination->sid, 7),
q,overlay_tx[q].length);
if (q<0||q>=OQ_MAX) return WHY("Invalid queue specified");
if (q<0||q>=OQ_MAX)
return WHY("Invalid queue specified");
if (p->payload && p->payload->position > p->payload->sizeLimit){
@ -202,18 +203,28 @@ int overlay_payload_enqueue(int q, struct overlay_frame *p)
p->payload->sizeLimit=p->payload->position;
}
if (0) dump_payload(p,"queued for delivery");
if (overlay_tx[q].length>=overlay_tx[q].maxLength)
return WHYF("Queue #%d congested (size = %d)",q,overlay_tx[q].maxLength);
if (0) dump_queue("before",q);
if (!p->destination){
int i;
int drop=1;
// make sure there is an interface up that allows broadcasts
for(i=0;i<OVERLAY_MAX_INTERFACES;i++){
if (overlay_interfaces[i].state==INTERFACE_STATE_UP
&& overlay_interfaces[i].send_broadcasts){
p->broadcast_sent_via[i]=0;
drop=0;
}else
p->broadcast_sent_via[i]=1;
}
// just drop it now
if (drop)
return -1;
p->sendBroadcast=1;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
p->broadcast_sent_via[i]=0;
}
struct overlay_frame *l=overlay_tx[q].last;
@ -231,7 +242,10 @@ int overlay_payload_enqueue(int q, struct overlay_frame *p)
if (0) dump_queue("after",q);
if (q==OQ_ISOCHRONOUS_VOICE) {
// Send a packet now
// Send a packet immediately to reduce latency
// Also this prevents aggregation of multiple voice frames which would
// increase the chance of packet loss leading to missing audio
// TODO, remove when we NACK and retry all frames
overlay_send_packet(NULL);
}

View File

@ -353,7 +353,7 @@ typedef struct overlay_interface {
These figures will be refined over time, and we will allow people to set them per-interface.
*/
int tick_ms; /* milliseconds per tick */
int send_broadcasts;
/* The time of the last tick on this interface in milli seconds */
time_ms_t last_tick_ms;
/* How many times have we abbreviated our address since we last announced it in full? */
@ -871,6 +871,7 @@ int overlay_route_node_info(overlay_mdp_frame *mdp,
int overlay_interface_register(char *name,
struct in_addr addr,
struct in_addr mask);
overlay_interface * overlay_interface_find(struct in_addr addr);
#ifdef HAVE_VOIPTEST
int app_pa_phone(int argc, const char *const *argv, struct command_line_option *o);