Extend packet queue to allow sending multiple copies of the same payload for reliability

This commit is contained in:
Jeremy Lakeman 2012-09-28 11:28:01 +09:30
parent fb7c9e5633
commit 2a7bd28e51
7 changed files with 38 additions and 59 deletions

View File

@ -399,7 +399,7 @@ int app_dna_lookup(int argc, const char *const *argv, struct command_line_option
{
if (rx.packetTypeAndFlags==MDP_ERROR)
{
WHYF(" Error message: %s", mdp.error.message);
WHYF(" Error message: %s", rx.error.message);
}
else if ((rx.packetTypeAndFlags&MDP_TYPE_MASK)==MDP_TX) {
/* Extract DID, Name, URI from response. */
@ -716,7 +716,7 @@ int app_mdp_ping(int argc, const char *const *argv, struct command_line_option *
int icount=atoi(count);
overlay_mdp_frame mdp;
bzero(&mdp, sizeof(overlay_mdp_frame));
/* Bind to MDP socket and await confirmation */
unsigned char srcsid[SID_SIZE];
int port=32768+(random()&32767);
@ -1370,6 +1370,7 @@ int app_id_self(int argc, const char *const *argv, struct command_line_option *o
if (debug & DEBUG_VERBOSE) DEBUG_argv("command", argc, argv);
/* List my own identities */
overlay_mdp_frame a;
bzero(&a, sizeof(overlay_mdp_frame));
int result;
int count=0;

View File

@ -223,7 +223,7 @@ int overlay_mdp_bind(unsigned char *localaddr,int port)
overlay_mdp_frame mdp;
mdp.packetTypeAndFlags=MDP_BIND|MDP_FORCE;
bcopy(localaddr,mdp.bind.sid,SID_SIZE);
mdp.bind.port_number=port;
mdp.bind.port=port;
int result=overlay_mdp_send(&mdp,MDP_AWAITREPLY,5000);
if (result) {
if (mdp.packetTypeAndFlags==MDP_ERROR)
@ -261,37 +261,33 @@ int overlay_mdp_getmyaddr(int index,unsigned char *sid)
int overlay_mdp_relevant_bytes(overlay_mdp_frame *mdp)
{
int len=4;
int len;
switch(mdp->packetTypeAndFlags&MDP_TYPE_MASK)
{
case MDP_GOODBYE:
/* no arguments for saying goodbye */
len=&mdp->raw[0]-(char *)mdp;
break;
case MDP_ADDRLIST:
len=&mdp->addrlist.sids[0][0]-(unsigned char *)mdp;
len+=mdp->addrlist.frame_sid_count*SID_SIZE;
len=(&mdp->addrlist.sids[0][0]-(unsigned char *)mdp) + mdp->addrlist.frame_sid_count*SID_SIZE;
break;
case MDP_GETADDRS:
len=&mdp->addrlist.sids[0][0]-(unsigned char *)mdp;
break;
case MDP_TX:
len=&mdp->out.payload[0]-(unsigned char *)mdp;
len+=mdp->out.payload_length;
len=(&mdp->out.payload[0]-(unsigned char *)mdp) + mdp->out.payload_length;
break;
case MDP_BIND:
len=&mdp->bind.sid[SID_SIZE]-(unsigned char *)mdp;
case MDP_BIND:
len=(&mdp->raw[0] - (char *)mdp) + sizeof(sockaddr_mdp);
break;
case MDP_ERROR:
/* This formulation is used so that we don't copy any bytes after the
end of the string, to avoid information leaks */
len=&mdp->error.message[0]-(char *)mdp;
len+=strlen(mdp->error.message)+1;
len=(&mdp->error.message[0]-(char *)mdp) + strlen(mdp->error.message)+1;
if (mdp->error.error) INFOF("mdp return/error code: %d:%s",mdp->error.error,mdp->error.message);
break;
case MDP_NODEINFO:
/* XXX problems with calculating this due to structure padding,
so doubled required space, and now it works. */
len=sizeof(overlay_mdp_nodeinfo)*2;
len=(&mdp->raw[0] - (char *)mdp) + sizeof(overlay_mdp_nodeinfo);
break;
default:
return WHY("Illegal MDP frame type.");

View File

@ -1173,6 +1173,10 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
break;
}
}
}else{
frame->send_copies --;
if (frame->send_copies>0)
keep_payload=1;
}
if (!keep_payload){

View File

@ -200,6 +200,10 @@ int overlay_mdp_process_bind_request(int sock, struct subscriber *subscriber, in
int flags, struct sockaddr_un *recvaddr, int recvaddrlen)
{
int i;
if (port<=0){
return WHYF("Port %d cannot be bound", port);
}
if (!mdp_bindings_initialised) {
/* Mark all slots as unused */
for(i=0;i<MDP_MAX_BINDINGS;i++)
@ -247,7 +251,8 @@ int overlay_mdp_process_bind_request(int sock, struct subscriber *subscriber, in
*/
free=random()%MDP_MAX_BINDINGS;
}
if (debug & DEBUG_MDPREQUESTS)
DEBUGF("Binding %s:%d",alloca_tohex_sid(subscriber->sid),port);
/* Okay, record binding and report success */
mdp_bindings[free].port=port;
mdp_bindings[free].subscriber=subscriber;
@ -373,6 +378,7 @@ int overlay_saw_mdp_containing_frame(struct overlay_frame *f, time_ms_t now)
Take payload from mdp frame itself.
*/
overlay_mdp_frame mdp;
bzero(&mdp, sizeof(overlay_mdp_frame));
/* Get source and destination addresses */
if (f->destination)
@ -919,6 +925,8 @@ int overlay_mdp_dispatch(overlay_mdp_frame *mdp,int userGeneratedFrameP,
rhizome_saw_voice_traffic();
}
frame->send_copies = mdp->out.send_copies;
if (overlay_payload_enqueue(qn, frame))
op_free(frame);
RETURN(0);
@ -1071,7 +1079,7 @@ void overlay_mdp_poll(struct sched_ent *alarm)
}
}
if (overlay_mdp_process_bind_request(alarm->poll.fd, subscriber, mdp->bind.port_number,
if (overlay_mdp_process_bind_request(alarm->poll.fd, subscriber, mdp->bind.port,
mdp->packetTypeAndFlags, recvaddr_un, recvaddrlen))
overlay_mdp_reply_error(alarm->poll.fd,recvaddr_un,recvaddrlen,3, "Port already in use");
else

View File

@ -31,6 +31,8 @@ struct overlay_frame {
unsigned int modifiers;
unsigned char ttl;
// temporary hack to improve reliability before implementing per-packet nack's
int send_copies;
/* Mark which interfaces the frame has been sent on,
so that we can ensure that broadcast frames get sent

View File

@ -213,6 +213,11 @@ int overlay_payload_enqueue(int q, struct overlay_frame *p)
if (overlay_tx[q].length>=overlay_tx[q].maxLength)
return WHYF("Queue #%d congested (size = %d)",q,overlay_tx[q].maxLength);
if (p->send_copies<=0)
p->send_copies=1;
else if(p->send_copies>5)
return WHY("Too many copies requested");
if (!p->destination){
int i;
int drop=1;

View File

@ -712,15 +712,12 @@ unsigned char *keyring_get_nm_bytes(sockaddr_mdp *priv,sockaddr_mdp *pub);
typedef struct overlay_mdp_data_frame {
sockaddr_mdp src;
sockaddr_mdp dst;
unsigned short payload_length;
uint16_t payload_length;
// temporary hack to improve reliability before implementing per-packet nack's
int send_copies;
unsigned char payload[MDP_MTU-100];
} overlay_mdp_data_frame;
typedef struct overlay_mdp_bind_request {
unsigned int port_number;
unsigned char sid[SID_SIZE];
} overlay_mdp_bind_request;
typedef struct overlay_mdp_error {
unsigned int error;
char message[128];
@ -735,39 +732,6 @@ typedef struct overlay_mdp_addrlist {
unsigned char sids[MDP_MAX_SID_REQUEST][SID_SIZE];
} overlay_mdp_addrlist;
/* elements sorted by size for alignment */
typedef struct overlay_mdp_vompevent {
/* Once a call has been established, this is how the MDP/VoMP server
and user-end process talk about the call. */
unsigned int call_session_token;
unsigned int audio_sample_endtime;
unsigned int audio_sample_starttime;
time_ms_t last_activity;
unsigned int flags;
unsigned short audio_sample_bytes;
unsigned char audio_sample_codec;
unsigned char local_state;
unsigned char remote_state;
/* list of codecs the registering party is willing to support
(for VOMPEVENT_REGISTERINTEREST) */
unsigned char supported_codecs[257];
union {
struct {
/* Used to precisely define the call end points during call
setup. */
char local_did[64];
char remote_did[64];
unsigned char local_sid[SID_SIZE];
unsigned char remote_sid[SID_SIZE];
/* session numbers of other calls in progress
(for VOMPEVENT_CALLINFO) */
unsigned int other_calls_sessions[VOMP_MAX_CALLS];
unsigned char other_calls_states[VOMP_MAX_CALLS];
};
unsigned char audio_bytes[MAX_AUDIO_BYTES];
};
} overlay_mdp_vompevent;
typedef struct overlay_mdp_nodeinfo {
unsigned char sid[SID_SIZE];
int sid_prefix_length; /* must be long enough to be unique */
@ -783,13 +747,12 @@ typedef struct overlay_mdp_nodeinfo {
} overlay_mdp_nodeinfo;
typedef struct overlay_mdp_frame {
unsigned int packetTypeAndFlags;
uint16_t packetTypeAndFlags;
union {
overlay_mdp_data_frame out;
overlay_mdp_data_frame in;
overlay_mdp_bind_request bind;
sockaddr_mdp bind;
overlay_mdp_addrlist addrlist;
overlay_mdp_vompevent vompevent;
overlay_mdp_nodeinfo nodeinfo;
overlay_mdp_error error;
/* 2048 is too large (causes EMSGSIZE errors on OSX, but probably fine on