Allow port reuse for mdp bindings

This commit is contained in:
Jeremy Lakeman 2016-05-04 15:42:10 +09:30
parent 5feb67512c
commit 9b5b82d972
2 changed files with 266 additions and 248 deletions

View File

@ -318,8 +318,9 @@ static int app_trace(const struct cli_parsed *parsed, struct cli_context *contex
if (mdp_send(mdp_sockfd, &mdp_header, payload, len))
goto end;
ssize_t recv_len = mdp_poll_recv(mdp_sockfd, gettime_ms()+500, &mdp_header, payload, sizeof payload);
if (recv_len == -1)
break;
if (recv_len>0){
ob_clear(b);
ob_limitsize(b,recv_len);

View File

@ -69,7 +69,6 @@ uint16_t mdp_loopback_port;
static void overlay_mdp_poll(struct sched_ent *alarm);
static void mdp_poll2(struct sched_ent *alarm);
static int overlay_mdp_releasebindings(struct socket_address *client);
static struct profile_total mdp_stats = { .name="overlay_mdp_poll" };
static struct sched_ent mdp_sock = {
@ -90,6 +89,19 @@ static struct sched_ent mdp_sock2_inet = {
.poll={.fd = -1},
};
struct mdp_binding{
struct mdp_binding *_next;
struct subscriber *subscriber;
mdp_port_t port;
uint8_t version;
uint8_t flags;
struct socket_address client;
time_ms_t binding_time;
};
struct mdp_binding *mdp_bindings=NULL;
mdp_port_t next_port_binding=256;
static int overlay_saw_mdp_frame(
struct internal_mdp_header *header,
struct overlay_buffer *payload);
@ -97,6 +109,38 @@ static int overlay_saw_mdp_frame(
static int mdp_send2(struct __sourceloc, const struct socket_address *client, const struct mdp_header *header,
const uint8_t *payload, size_t payload_len);
static uint8_t has_dead_clients=0;
static int mark_dead_client(const struct socket_address *client)
{
struct mdp_binding *binding = mdp_bindings;
while(binding){
if (cmp_sockaddr(&binding->client, client)==0){
binding->port = 0;
has_dead_clients = 1;
}
binding = binding->_next;
}
return 0;
}
static int free_dead_clients(){
if (!has_dead_clients)
return 0;
//TODO send dummy frame?
struct mdp_binding **binding = &mdp_bindings;
while(*binding){
struct mdp_binding *b = (*binding);
if (b->port==0){
(*binding) = b->_next;
free(b);
}else{
binding = &b->_next;
}
}
has_dead_clients=1;
return 0;
}
/* Delete all UNIX socket files in instance directory. */
void overlay_mdp_clean_socket_files()
{
@ -232,20 +276,8 @@ int overlay_mdp_setup_sockets()
return 0;
}
#define MDP_MAX_BINDINGS 100
#define MDP_MAX_SOCKET_NAME_LEN 110
struct mdp_binding{
struct subscriber *subscriber;
mdp_port_t port;
int version;
struct socket_address client;
time_ms_t binding_time;
};
struct mdp_binding mdp_bindings[MDP_MAX_BINDINGS];
int mdp_bindings_initialised=0;
mdp_port_t next_port_binding=256;
static int overlay_mdp_reply(int sock, struct socket_address *client,
overlay_mdp_frame *mdpreply)
@ -261,7 +293,7 @@ static int overlay_mdp_reply(int sock, struct socket_address *client,
if (errno == ENOENT){
/* far-end of socket has died, so drop binding */
INFOF("Closing dead MDP client '%s'", alloca_socket_address(client));
overlay_mdp_releasebindings(client);
mark_dead_client(client);
}
return -1;
}
@ -296,18 +328,6 @@ static int overlay_mdp_reply_ok(int sock, struct socket_address *client,
return overlay_mdp_reply_error(sock, client, 0, message);
}
static int overlay_mdp_releasebindings(struct socket_address *client)
{
/* Free up any MDP bindings held by this client. */
int i;
for(i=0;i<MDP_MAX_BINDINGS;i++)
if (cmp_sockaddr(&mdp_bindings[i].client, client)==0)
mdp_bindings[i].port=0;
return 0;
}
static int overlay_mdp_process_bind_request(struct subscriber *subscriber, mdp_port_t port,
int flags, struct socket_address *client)
{
@ -317,36 +337,25 @@ static int overlay_mdp_process_bind_request(struct subscriber *subscriber, mdp_p
return WHYF("Port %d cannot be bound", port);
}
if (!mdp_bindings_initialised) {
/* Mark all slots as unused */
int i;
for(i=0;i<MDP_MAX_BINDINGS;i++)
mdp_bindings[i].port=0;
mdp_bindings_initialised=1;
}
/* See if binding already exists */
int free=-1;
{
int i;
for(i=0;i<MDP_MAX_BINDINGS;i++) {
struct mdp_binding *b = mdp_bindings;
while(b){
/* Look for duplicate bindings */
if (mdp_bindings[i].port == port && mdp_bindings[i].subscriber == subscriber) {
if (cmp_sockaddr(&mdp_bindings[i].client, client)==0) {
if (b->port == port && b->subscriber == subscriber) {
if (cmp_sockaddr(&b->client, client)==0) {
// this client already owns this port binding?
INFO("Identical binding exists");
return 0;
}else if(flags&MDP_FORCE){
// steal the port binding
free=i;
break;
}else if((flags & MDP_FLAG_REUSE) && (b->flags & MDP_FLAG_REUSE)){
// allow mutliple bindings
}else{
return WHY("Port already in use");
}
}
/* Look for free slots in case we need one */
if ((free==-1)&&(mdp_bindings[i].port==0)) free=i;
}
b=b->_next;
}
/* Okay, so no binding exists. Make one, and return success.
@ -356,22 +365,19 @@ static int overlay_mdp_process_bind_request(struct subscriber *subscriber, mdp_p
probing the sockets periodically (by sending an MDP NOOP frame perhaps?) and
destroying any socket that reports an error.
*/
if (free==-1) {
/* XXX Should we probe for stale bindings here and now, since this is when
we want the spare slots ?
Picking one at random is as good a policy as any.
Call listeners don't have a port binding, so are unaffected by this.
*/
free=random()%MDP_MAX_BINDINGS;
if (!b){
b = emalloc_zero(sizeof(struct mdp_binding));
b->_next = mdp_bindings;
mdp_bindings = b;
}
/* Okay, record binding and report success */
mdp_bindings[free].port=port;
mdp_bindings[free].subscriber=subscriber;
mdp_bindings[free].version=0;
mdp_bindings[free].client.addrlen = client->addrlen;
memcpy(&mdp_bindings[free].client.addr, &client->addr, client->addrlen);
mdp_bindings[free].binding_time=gettime_ms();
b->port=port;
b->subscriber=subscriber;
b->version=0;
b->flags = flags & MDP_FLAG_REUSE;
b->client.addrlen = client->addrlen;
memcpy(&b->client.addr, &client->addr, client->addrlen);
b->binding_time=gettime_ms();
return 0;
}
@ -513,44 +519,13 @@ void mdp_init_response(const struct internal_mdp_header *in, struct internal_mdp
out->qos = in->qos;
}
static int overlay_saw_mdp_frame(
static int send_packet_to_client(
struct internal_mdp_header *header,
struct overlay_buffer *payload)
{
IN();
struct overlay_buffer *payload,
int version,
struct socket_address *client){
if (!allow_inbound_packet(header))
RETURN(0);
/* Regular MDP frame addressed to us. Look for matching port binding,
and if available, push to client. Else do nothing, or if we feel nice
send back a connection refused type message? Silence is probably the
more prudent path.
*/
DEBUGF(mdprequests, "Received packet (MDP ports: src=%s*:%"PRImdp_port_t", dst=%"PRImdp_port_t")",
alloca_tohex_sid_t_trunc(header->source->sid, 14),
header->source_port, header->destination_port);
int match=-1;
int i;
for(i=0;i<MDP_MAX_BINDINGS;i++)
{
if (mdp_bindings[i].port!=header->destination_port)
continue;
if ((!header->destination) || mdp_bindings[i].subscriber == header->destination){
/* exact match, so stop searching */
match=i;
break;
}else if (!mdp_bindings[i].subscriber){
/* If we find an "ANY" binding, remember it. But we will prefer an exact match if we find one */
match=i;
}
}
if (match>-1) {
switch(mdp_bindings[match].version){
switch(version){
case 0:
{
overlay_mdp_frame mdp;
@ -561,8 +536,7 @@ static int overlay_saw_mdp_frame(
ssize_t len = overlay_mdp_relevant_bytes(&mdp);
if (len < 0)
RETURN(WHY("unsupported MDP packet type"));
struct socket_address *client = &mdp_bindings[match].client;
return WHY("unsupported MDP packet type");
DEBUGF(mdprequests, "Forwarding packet to client %s", alloca_socket_address(client));
ssize_t r = sendto(mdp_sock.poll.fd, &mdp, len, 0, &client->addr, client->addrlen);
if (r == -1){
@ -570,17 +544,16 @@ static int overlay_saw_mdp_frame(
if (errno == ENOENT){
/* far-end of socket has died, so drop binding */
INFOF("Closing dead MDP client '%s'", alloca_socket_address(client));
overlay_mdp_releasebindings(client);
mark_dead_client(client);
}
RETURN(-1);
return -1;
}
if (r != len)
RETURN(WHYF("sendto() sent %zu bytes of MDP reply (%zu) to %s", (size_t)r, (size_t)len, alloca_socket_address(client)));
RETURN(0);
WARNF("sendto() sent %zu bytes of MDP reply (%zu) to %s", (size_t)r, (size_t)len, alloca_socket_address(client));
return 0;
}
case 1:
{
struct socket_address *client = &mdp_bindings[match].client;
struct mdp_header client_header;
client_header.local.sid=header->destination?header->destination->sid:SID_BROADCAST;
client_header.local.port=header->destination_port;
@ -595,26 +568,68 @@ static int overlay_saw_mdp_frame(
size_t len = ob_remaining(payload);
const uint8_t *ptr = ob_get_bytes_ptr(payload, len);
RETURN(mdp_send2(__WHENCE__, client, &client_header, ptr, len));
return mdp_send2(__WHENCE__, client, &client_header, ptr, len);
}
}
} else {
return -1;
}
static int overlay_saw_mdp_frame(
struct internal_mdp_header *header,
struct overlay_buffer *payload)
{
if (!allow_inbound_packet(header))
return 0;
/* Regular MDP frame addressed to us. Look for matching port binding,
and if available, push to client. Else do nothing, or if we feel nice
send back a connection refused type message? Silence is probably the
more prudent path.
*/
DEBUGF(mdprequests, "Received packet (MDP ports: src=%s*:%"PRImdp_port_t", dst=%"PRImdp_port_t")",
alloca_tohex_sid_t_trunc(header->source->sid, 14),
header->source_port, header->destination_port);
struct mdp_binding *b = mdp_bindings;
// first look for an exact subscriber match
while(b){
if (b->port==header->destination_port && b->subscriber &&
((!header->destination) || b->subscriber == header->destination)
){
/* match */
if (send_packet_to_client(header, payload, b->version, &b->client)==0
&& header->destination
&& (b->flags & MDP_FLAG_REUSE)==0){
goto end;
}
}
b=b->_next;
}
// then look for ANY bindings
while(b){
if (b->port==header->destination_port && !b->subscriber){
/* match */
if (send_packet_to_client(header, payload, b->version, &b->client)==0 && (b->flags & MDP_FLAG_REUSE)==0)
goto end;
}
b=b->_next;
}
// look for a compile time defined internal binding
struct internal_binding *binding;
for (binding = SECTION_START(bindings); binding < SECTION_END(bindings); ++binding) {
if (binding->port == header->destination_port)
RETURN(binding->function(header, payload));
if (binding->port == header->destination_port){
binding->function(header, payload);
goto end;
}
}
/* Unbound socket. We won't be sending ICMP style connection refused
messages, partly because they are a waste of bandwidth. */
RETURN(WHYF("Received packet for which no listening process exists (MDP ports: src=%d, dst=%d",
header->source_port, header->destination_port));
}
RETURN(0);
OUT();
end:
free_dead_clients();
return 0;
}
int overlay_mdp_dnalookup_reply(struct subscriber *dest, mdp_port_t dest_port,
@ -656,26 +671,21 @@ static int overlay_mdp_check_binding(struct subscriber *subscriber, mdp_port_t p
if (!client)
return 0;
/* Check if the address is in the list of bound addresses,
and that the recvaddr matches. */
int i;
for(i = 0; i < MDP_MAX_BINDINGS; ++i) {
if (mdp_bindings[i].port != port)
continue;
if ((!mdp_bindings[i].subscriber) || mdp_bindings[i].subscriber == subscriber) {
/* Binding matches, now make sure the sockets match */
if (cmp_sockaddr(&mdp_bindings[i].client, client)==0) {
/* Everything matches, so this unix socket and MDP address combination is valid */
/* Check if this client has bound this sid/port */
struct mdp_binding *b = mdp_bindings;
while(b){
if (b->port == port
&& (!b->subscriber || b->subscriber == subscriber)
&& cmp_sockaddr(&b->client, client)==0)
return 0;
}
}
b=b->_next;
}
return WHYF("No matching binding: addr=%s port=%"PRImdp_port_t" -- possible spoofing attack",
WARNF("No matching binding: addr=%s port=%"PRImdp_port_t,
alloca_tohex_sid_t(subscriber->sid),
port
);
return -1;
}
void overlay_mdp_encode_ports(struct overlay_buffer *plaintext, mdp_port_t dst_port, mdp_port_t src_port)
@ -1344,14 +1354,17 @@ static mdp_port_t get_next_port(){
again:
if (next_port_binding > 32*1024)
// note, we're assuming that there are NO internal port bindings >=256
next_port_binding=256;
else
next_port_binding++;
unsigned i;
for(i=0;i<MDP_MAX_BINDINGS;i++) {
if (mdp_bindings[i].port==next_port_binding)
// make sure there are *no* bindings for this port on any SID.
struct mdp_binding *b = mdp_bindings;
while(b){
if (b->port == next_port_binding)
goto again;
b = b->_next;
}
return next_port_binding;
}
@ -1363,18 +1376,8 @@ static void mdp_process_packet(struct socket_address *client, struct mdp_header
bzero(&internal_header, sizeof(internal_header));
if ((header->flags & MDP_FLAG_CLOSE) && header->local.port==0){
int i;
for(i=0;i<MDP_MAX_BINDINGS;i++) {
if (mdp_bindings[i].port!=0
&& cmp_sockaddr(&mdp_bindings[i].client, client)==0){
DEBUGF(mdprequests, "Unbind MDP %s:%d from %s",
mdp_bindings[i].subscriber?alloca_tohex_sid_t(mdp_bindings[i].subscriber->sid):"All",
mdp_bindings[i].port,
alloca_socket_address(client));
mdp_bindings[i].port=0;
}
}
// should we expect clients to wait?
mark_dead_client(client);
free_dead_clients();
return;
}
@ -1396,11 +1399,32 @@ static void mdp_process_packet(struct socket_address *client, struct mdp_header
}
}
struct mdp_binding *binding=NULL, *free_slot=NULL;
struct mdp_binding **pclient_binding=NULL;
struct mdp_binding *client_binding=NULL;
struct mdp_binding *conflicting_binding=NULL;
// assign the next available port number
if (header->local.port==0 && header->flags & MDP_FLAG_BIND){
header->local.port=get_next_port();
}else{
// find existing matching or conflicting bindings
struct mdp_binding **binding = &mdp_bindings;
while(*binding){
struct mdp_binding *b = (*binding);
if (b->port == header->local.port
&& b->subscriber == internal_header.source){
if (cmp_sockaddr(&b->client, client)==0){
client_binding = b;
pclient_binding = binding;
break;
}
// any conflicting binding will do;
conflicting_binding = b;
}
binding = &b->_next;
}
}
internal_header.source_port = header->local.port;
@ -1408,51 +1432,42 @@ static void mdp_process_packet(struct socket_address *client, struct mdp_header
internal_header.ttl = header->ttl;
internal_header.qos = header->qos;
// find matching binding
{
unsigned i;
for(i=0;i<MDP_MAX_BINDINGS;i++) {
if ((!free_slot) && mdp_bindings[i].port==0)
free_slot=&mdp_bindings[i];
if (mdp_bindings[i].port == header->local.port){
if (mdp_bindings[i].subscriber == internal_header.source){
binding = &mdp_bindings[i];
break;
}else if(!mdp_bindings[i].subscriber)
binding = &mdp_bindings[i];
}
}
}
if (header->flags & MDP_FLAG_BIND){
if (binding){
WHYF("Port %d already bound", header->local.port);
mdp_reply_error(client, header);
return;
}
if (!free_slot){
WHY("Max supported bindings reached");
if (conflicting_binding && (header->flags & MDP_FLAG_REUSE)==0 && (conflicting_binding->flags & MDP_FLAG_REUSE))
conflicting_binding = NULL;
if (conflicting_binding){
WHYF("Sorry %s, %s:%u is already bound by %s",
alloca_socket_address(client),
alloca_tohex_sid_t(header->local.sid),
header->local.port,
alloca_socket_address(&conflicting_binding->client));
mdp_reply_error(client, header);
return;
}
if (!client_binding){
DEBUGF(mdprequests, "Bind MDP %s:%d to %s",
alloca_tohex_sid_t(header->local.sid),
header->local.port,
alloca_socket_address(client));
client_binding = emalloc_zero(sizeof(struct mdp_binding));
if (!client_binding){
mdp_reply_error(client, header);
return;
}
// claim binding
binding = free_slot;
binding->port = header->local.port;
binding->subscriber = internal_header.source;
bcopy(&client->addr, &binding->client.addr, client->addrlen);
binding->client.addrlen = client->addrlen;
binding->binding_time=gettime_ms();
binding->version=1;
client_binding->port = header->local.port;
client_binding->subscriber = internal_header.source;
bcopy(&client->addr, &client_binding->client.addr, client->addrlen);
client_binding->client.addrlen = client->addrlen;
client_binding->binding_time=gettime_ms();
client_binding->version=1;
// tell the client what we actually bound (with flags & MDP_FLAG_BIND still set)
client_binding->_next = mdp_bindings;
mdp_bindings = client_binding;
}
// tell the client that they (still?) have this binding (with flags & MDP_FLAG_BIND still set)
mdp_reply2(__WHENCE__, client, header, MDP_FLAG_BIND, NULL, 0);
}
@ -1460,11 +1475,9 @@ static void mdp_process_packet(struct socket_address *client, struct mdp_header
// process local commands
switch(header->remote.port){
case MDP_LISTEN:
// double check that this binding belongs to this connection
if (!binding
|| cmp_sockaddr(&binding->client, client)!=0){
WHYF("That port is not bound by you %s vs %s",
binding?alloca_socket_address(&binding->client):"(none)",
// double check that you have a binding
if (!client_binding){
WHYF("That port is not bound by you %s",
alloca_socket_address(client));
mdp_reply_error(client, header);
}
@ -1494,14 +1507,12 @@ static void mdp_process_packet(struct socket_address *client, struct mdp_header
}
}else{
// double check that this binding belongs to this connection
if (!binding
|| !internal_header.source
|| header->local.port == 0
|| cmp_sockaddr(&binding->client, client)!=0){
WHYF("Can't send data packet, no matching port binding for %s:%d!",
// double check that you have a binding
if (!client_binding){
WHYF("Can't send data packet, %s:%d is not bound to %s!",
alloca_tohex_sid_t(header->local.sid),
header->local.port);
header->local.port,
alloca_socket_address(client));
mdp_reply_error(client, header);
return;
}
@ -1521,15 +1532,14 @@ static void mdp_process_packet(struct socket_address *client, struct mdp_header
}
// remove binding
if (binding
&& header->flags & MDP_FLAG_CLOSE
&& cmp_sockaddr(&binding->client, client)==0){
if (client_binding
&& header->flags & MDP_FLAG_CLOSE){
DEBUGF(mdprequests, "Unbind MDP %s:%d from %s",
binding->subscriber?alloca_tohex_sid_t(binding->subscriber->sid):"All",
binding->port,
client_binding->subscriber?alloca_tohex_sid_t(client_binding->subscriber->sid):"All",
client_binding->port,
alloca_socket_address(client));
binding->port=0;
binding=NULL;
*pclient_binding = client_binding->_next;
free(client_binding);
}
}
@ -1566,8 +1576,15 @@ static int mdp_send2(struct __sourceloc __whence, const struct socket_address *c
if (fd==-1)
return WHYF("Unhandled client family %d", client->addr.sa_family);
if (sendmsg(fd, &hdr, 0)<0)
return WHY_perror("sendmsg");
if (sendmsg(fd, &hdr, 0)<0){
WHY_perror("sendmsg");
if (errno == ENOENT){
/* far-end of socket has died, so drop binding */
INFOF("Closing dead MDP client '%s'", alloca_socket_address(client));
mark_dead_client(client);
}
return -1;
}
return 0;
}
@ -1641,7 +1658,7 @@ static void overlay_mdp_poll(struct sched_ent *alarm)
switch (mdp_type) {
case MDP_GOODBYE:
DEBUGF(mdprequests, "MDP_GOODBYE from %s", alloca_socket_address(&client));
overlay_mdp_releasebindings(&client);
mark_dead_client(&client);
return;
case MDP_ROUTING_TABLE: