ZeroTierOne/attic/historic/anode/libanode/system_transport.c

949 lines
32 KiB
C
Raw Normal View History

/* libanode: the Anode C reference implementation
* Copyright (C) 2009-2010 Adam Ierymenko <adam.ierymenko@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. */
#include <stdio.h>
#include <netdb.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include "anode.h"
#include "impl/mutex.h"
#include "impl/thread.h"
#include "impl/misc.h"
#include "impl/dns_txt.h"
#ifdef WINDOWS
#include <windows.h>
#include <winsock2.h>
#define AnodeSystemTransport__close_socket(s) closesocket((s))
#define ANODE_USE_SELECT 1
#else
#include <poll.h>
#include <unistd.h>
#define AnodeSystemTransport__close_socket(s) close((s))
#endif
static const char *AnodeSystemTransport_CLASS = "SystemTransport";
/* ======================================================================== */
struct AnodeSystemTransport;
struct AnodeSystemTransport_AnodeSocket
{
AnodeSocket base; /* must be first */
unsigned int entry_idx;
};
#define ANODE_SYSTEM_TRANSPORT_DNS_MAX_RESULTS 16
struct AnodeSystemTransport__dns_request
{
struct AnodeSystemTransport__dns_request *next;
AnodeThread *thread;
struct AnodeSystemTransport *owner;
void (*event_handler)(const AnodeEvent *event);
char name[256];
enum AnodeTransportDnsIncludeMode ipv4_include_mode;
enum AnodeTransportDnsIncludeMode ipv6_include_mode;
enum AnodeTransportDnsIncludeMode anode_include_mode;
AnodeNetworkAddress addresses[ANODE_SYSTEM_TRANSPORT_DNS_MAX_RESULTS];
unsigned int address_count;
int error_code;
};
#ifdef ANODE_USE_SELECT
typedef int AnodeSystemTransport__poll_fd; /* for select() */
#else
typedef struct pollfd AnodeSystemTransport__poll_fd; /* for poll() */
#endif
struct AnodeSystemTransport
{
AnodeTransport interface; /* must be first */
AnodeTransport *base;
#ifdef ANODE_USE_SELECT
FD_SET readfds;
FD_SET writefds;
#endif
void (*default_event_handler)(const AnodeEvent *event);
AnodeSystemTransport__poll_fd *fds;
struct AnodeSystemTransport_AnodeSocket *sockets;
unsigned int fd_count;
unsigned int fd_capacity;
struct AnodeSystemTransport__dns_request *pending_dns_requests;
int invoke_pipe[2];
AnodeMutex invoke_pipe_m;
void *invoke_pipe_buf[2];
unsigned int invoke_pipe_buf_ptr;
};
/* ======================================================================== */
/* Internal helper methods */
static unsigned int AnodeSystemTransport__add_entry(struct AnodeSystemTransport *transport)
{
if ((transport->fd_count + 1) > transport->fd_capacity) {
transport->fd_capacity += 8;
transport->fds = realloc(transport->fds,sizeof(AnodeSystemTransport__poll_fd) * transport->fd_capacity);
transport->sockets = realloc(transport->sockets,sizeof(struct AnodeSystemTransport_AnodeSocket) * transport->fd_capacity);
}
return transport->fd_count++;
}
static void AnodeSystemTransport__remove_entry(struct AnodeSystemTransport *transport,const unsigned int idx)
{
unsigned int i;
--transport->fd_count;
for(i=idx;i<transport->fd_count;++i) {
Anode_memcpy(&transport->fds[i],&transport->fds[i+1],sizeof(AnodeSystemTransport__poll_fd));
Anode_memcpy(&transport->sockets[i],&transport->sockets[i+1],sizeof(struct AnodeSystemTransport_AnodeSocket));
}
if ((transport->fd_capacity - transport->fd_count) > 16) {
transport->fd_capacity -= 16;
transport->fds = realloc(transport->fds,sizeof(AnodeSystemTransport__poll_fd) * transport->fd_capacity);
transport->sockets = realloc(transport->sockets,sizeof(struct AnodeSystemTransport_AnodeSocket) * transport->fd_capacity);
}
}
static void AnodeSystemTransport__dns_invoke_on_completion(void *_dreq)
{
struct AnodeSystemTransport__dns_request *dreq = (struct AnodeSystemTransport__dns_request *)_dreq;
struct AnodeSystemTransport__dns_request *ptr,**lastnext;
AnodeThread_join(dreq->thread);
ptr = dreq->owner->pending_dns_requests;
lastnext = &dreq->owner->pending_dns_requests;
while (ptr) {
if (ptr == dreq) {
*lastnext = ptr->next;
break;
} else {
lastnext = &ptr->next;
ptr = ptr->next;
}
}
free(dreq);
}
static void AnodeSystemTransport__dns_thread_main(void *_dreq)
{
struct AnodeSystemTransport__dns_request *dreq = (struct AnodeSystemTransport__dns_request *)_dreq;
dreq->owner->interface.invoke((AnodeTransport *)dreq->owner,dreq,&AnodeSystemTransport__dns_invoke_on_completion);
}
static void AnodeSystemTransport__do_close(struct AnodeSystemTransport *transport,struct AnodeSystemTransport_AnodeSocket *sock,const int error_code,const int generate_event)
{
AnodeEvent evbuf;
int fd;
if (sock->base.class_name == AnodeSystemTransport_CLASS) {
#ifdef ANODE_USE_SELECT
fd = (int)(transport->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx]);
#else
fd = transport->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx].fd;
#endif
if ((sock->base.type == ANODE_SOCKET_STREAM_CONNECTION)&&(sock->base.state != ANODE_SOCKET_CLOSED)) {
sock->base.state = ANODE_SOCKET_CLOSED;
if (generate_event) {
evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_CLOSED;
evbuf.transport = (AnodeTransport *)transport;
evbuf.sock = (AnodeSocket *)sock;
evbuf.datagram_from = NULL;
evbuf.dns_name = NULL;
evbuf.dns_addresses = NULL;
evbuf.dns_address_count = 0;
evbuf.error_code = error_code;
evbuf.data_length = 0;
evbuf.data = NULL;
if (sock->base.event_handler)
sock->base.event_handler(&evbuf);
else if (transport->default_event_handler)
transport->default_event_handler(&evbuf);
}
}
AnodeSystemTransport__close_socket(fd);
AnodeSystemTransport__remove_entry(transport,((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx);
#ifdef ANODE_USE_SELECT
FD_CLR(sock,&THIS->readfds);
FD_CLR(sock,&THIS->writefds);
#endif
} else transport->base->close(transport->base,(AnodeSocket *)sock);
}
static int AnodeSystemTransport__populate_network_endpoint(const struct sockaddr_storage *saddr,AnodeNetworkEndpoint *ep)
{
switch(saddr->ss_family) {
case AF_INET:
ep->address.type = ANODE_NETWORK_ADDRESS_IPV4;
*((uint32_t *)ep->address.bits) = ((struct sockaddr_in *)saddr)->sin_addr.s_addr;
ep->port = ntohs(((struct sockaddr_in *)saddr)->sin_port);
return 1;
case AF_INET6:
ep->address.type = ANODE_NETWORK_ADDRESS_IPV6;
Anode_memcpy(ep->address.bits,((struct sockaddr_in6 *)saddr)->sin6_addr.s6_addr,16);
ep->port = ntohs(((struct sockaddr_in6 *)saddr)->sin6_port);
return 1;
}
return 0;
}
/* ======================================================================== */
#ifdef THIS
#undef THIS
#endif
#define THIS ((struct AnodeSystemTransport *)transport)
static void AnodeSystemTransport_invoke(AnodeTransport *transport,
void *ptr,
void (*func)(void *))
{
void *invoke_msg[2];
invoke_msg[0] = ptr;
invoke_msg[1] = (void *)func;
AnodeMutex_lock(&THIS->invoke_pipe_m);
write(THIS->invoke_pipe[1],(void *)(&invoke_msg),sizeof(invoke_msg));
AnodeMutex_unlock(&THIS->invoke_pipe_m);
}
static void AnodeSystemTransport_dns_resolve(AnodeTransport *transport,
const char *name,
void (*event_handler)(const AnodeEvent *),
enum AnodeTransportDnsIncludeMode ipv4_include_mode,
enum AnodeTransportDnsIncludeMode ipv6_include_mode,
enum AnodeTransportDnsIncludeMode anode_include_mode)
{
struct AnodeSystemTransport__dns_request *dreq = malloc(sizeof(struct AnodeSystemTransport__dns_request));
dreq->owner = THIS;
dreq->event_handler = event_handler;
Anode_str_copy(dreq->name,name,sizeof(dreq->name));
dreq->ipv4_include_mode = ipv4_include_mode;
dreq->ipv6_include_mode = ipv6_include_mode;
dreq->anode_include_mode = anode_include_mode;
dreq->address_count = 0;
dreq->error_code = 0;
dreq->next = THIS->pending_dns_requests;
THIS->pending_dns_requests = dreq;
dreq->thread = AnodeThread_create(&AnodeSystemTransport__dns_thread_main,dreq,0);
}
static AnodeSocket *AnodeSystemTransport_datagram_listen(AnodeTransport *transport,
const AnodeNetworkAddress *local_address,
int local_port,
int *error_code)
{
struct sockaddr_in sin4;
struct sockaddr_in6 sin6;
struct AnodeSystemTransport_AnodeSocket *sock;
unsigned int entry_idx;
int fd;
int tmp;
switch(local_address->type) {
case ANODE_NETWORK_ADDRESS_IPV4:
fd = socket(AF_INET,SOCK_DGRAM,0);
if (fd <= 0) {
*error_code = ANODE_ERR_UNABLE_TO_BIND;
return (AnodeSocket *)0;
}
tmp = 1;
setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&tmp,sizeof(tmp));
fcntl(fd,F_SETFL,O_NONBLOCK);
Anode_zero(&sin4,sizeof(struct sockaddr_in));
sin4.sin_family = AF_INET;
sin4.sin_port = htons(local_port);
sin4.sin_addr.s_addr = *((uint32_t *)local_address->bits);
if (bind(fd,(const struct sockaddr *)&sin4,sizeof(sin4))) {
AnodeSystemTransport__close_socket(fd);
*error_code = ANODE_ERR_UNABLE_TO_BIND;
return (AnodeSocket *)0;
}
break;
case ANODE_NETWORK_ADDRESS_IPV6:
fd = socket(AF_INET6,SOCK_DGRAM,0);
if (fd <= 0) {
*error_code = ANODE_ERR_UNABLE_TO_BIND;
return (AnodeSocket *)0;
}
tmp = 1; setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&tmp,sizeof(tmp));
fcntl(fd,F_SETFL,O_NONBLOCK);
#ifdef IPV6_V6ONLY
tmp = 1; setsockopt(fd,IPPROTO_IPV6,IPV6_V6ONLY,&tmp,sizeof(tmp));
#endif
Anode_zero(&sin6,sizeof(struct sockaddr_in6));
sin6.sin6_family = AF_INET6;
sin6.sin6_port = htons(local_port);
Anode_memcpy(sin6.sin6_addr.s6_addr,local_address->bits,16);
if (bind(fd,(const struct sockaddr *)&sin6,sizeof(sin6))) {
AnodeSystemTransport__close_socket(fd);
*error_code = ANODE_ERR_UNABLE_TO_BIND;
return (AnodeSocket *)0;
}
break;
default:
if (THIS->base)
return THIS->base->datagram_listen(THIS->base,local_address,local_port,error_code);
else {
*error_code = ANODE_ERR_ADDRESS_TYPE_NOT_SUPPORTED;
return (AnodeSocket *)0;
}
}
entry_idx = AnodeSystemTransport__add_entry(THIS);
sock = &(THIS->sockets[entry_idx]);
sock->base.type = ANODE_SOCKET_DATAGRAM;
sock->base.state = ANODE_SOCKET_OPEN;
Anode_memcpy(&sock->base.endpoint.address,local_address,sizeof(AnodeNetworkAddress));
sock->base.endpoint.port = local_port;
sock->base.class_name = AnodeSystemTransport_CLASS;
sock->base.user_ptr[0] = NULL;
sock->base.user_ptr[1] = NULL;
sock->base.event_handler = NULL;
sock->entry_idx = entry_idx;
THIS->fds[entry_idx].fd = fd;
THIS->fds[entry_idx].events = POLLIN;
THIS->fds[entry_idx].revents = 0;
*error_code = 0;
return (AnodeSocket *)sock;
}
static AnodeSocket *AnodeSystemTransport_stream_listen(AnodeTransport *transport,
const AnodeNetworkAddress *local_address,
int local_port,
int *error_code)
{
struct sockaddr_in sin4;
struct sockaddr_in6 sin6;
struct AnodeSystemTransport_AnodeSocket *sock;
unsigned int entry_idx;
int fd;
int tmp;
switch(local_address->type) {
case ANODE_NETWORK_ADDRESS_IPV4:
fd = socket(AF_INET,SOCK_STREAM,0);
if (fd < 0) {
*error_code = ANODE_ERR_UNABLE_TO_BIND;
return (AnodeSocket *)0;
}
fcntl(fd,F_SETFL,O_NONBLOCK);
Anode_zero(&sin4,sizeof(struct sockaddr_in));
sin4.sin_family = AF_INET;
sin4.sin_port = htons(local_port);
sin4.sin_addr.s_addr = *((uint32_t *)local_address->bits);
if (bind(fd,(const struct sockaddr *)&sin4,sizeof(sin4))) {
AnodeSystemTransport__close_socket(fd);
*error_code = ANODE_ERR_UNABLE_TO_BIND;
return (AnodeSocket *)0;
}
if (listen(fd,8)) {
AnodeSystemTransport__close_socket(fd);
*error_code = ANODE_ERR_UNABLE_TO_BIND;
return (AnodeSocket *)0;
}
break;
case ANODE_NETWORK_ADDRESS_IPV6:
fd = socket(AF_INET6,SOCK_STREAM,0);
if (fd < 0) {
*error_code = ANODE_ERR_UNABLE_TO_BIND;
return (AnodeSocket *)0;
}
fcntl(fd,F_SETFL,O_NONBLOCK);
#ifdef IPV6_V6ONLY
tmp = 1; setsockopt(fd,IPPROTO_IPV6,IPV6_V6ONLY,&tmp,sizeof(tmp));
#endif
Anode_zero(&sin6,sizeof(struct sockaddr_in6));
sin6.sin6_family = AF_INET6;
sin6.sin6_port = htons(local_port);
Anode_memcpy(sin6.sin6_addr.s6_addr,local_address->bits,16);
if (bind(fd,(const struct sockaddr *)&sin6,sizeof(sin6))) {
AnodeSystemTransport__close_socket(fd);
*error_code = ANODE_ERR_UNABLE_TO_BIND;
return (AnodeSocket *)0;
}
if (listen(fd,8)) {
AnodeSystemTransport__close_socket(fd);
*error_code = ANODE_ERR_UNABLE_TO_BIND;
return (AnodeSocket *)0;
}
break;
default:
if (THIS->base)
return THIS->base->stream_listen(THIS->base,local_address,local_port,error_code);
else {
*error_code = ANODE_ERR_ADDRESS_TYPE_NOT_SUPPORTED;
return (AnodeSocket *)0;
}
}
entry_idx = AnodeSystemTransport__add_entry(THIS);
sock = &(THIS->sockets[entry_idx]);
sock->base.type = ANODE_SOCKET_STREAM_LISTEN;
sock->base.state = ANODE_SOCKET_OPEN;
Anode_memcpy(&sock->base.endpoint.address,local_address,sizeof(AnodeNetworkAddress));
sock->base.endpoint.port = local_port;
sock->base.class_name = AnodeSystemTransport_CLASS;
sock->base.user_ptr[0] = NULL;
sock->base.user_ptr[1] = NULL;
sock->base.event_handler = NULL;
sock->entry_idx = entry_idx;
THIS->fds[entry_idx].fd = fd;
THIS->fds[entry_idx].events = POLLIN;
THIS->fds[entry_idx].revents = 0;
*error_code = 0;
return (AnodeSocket *)sock;
}
static int AnodeSystemTransport_datagram_send(AnodeTransport *transport,
AnodeSocket *sock,
const void *data,
int data_len,
const AnodeNetworkEndpoint *to_endpoint)
{
struct sockaddr_in sin4;
struct sockaddr_in6 sin6;
#ifdef ANODE_USE_SELECT
const int fd = (int)(THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx]);
#else
const int fd = THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx].fd;
#endif
switch(to_endpoint->address.type) {
case ANODE_NETWORK_ADDRESS_IPV4:
Anode_zero(&sin4,sizeof(struct sockaddr_in));
sin4.sin_family = AF_INET;
sin4.sin_port = htons((uint16_t)to_endpoint->port);
sin4.sin_addr.s_addr = *((uint32_t *)to_endpoint->address.bits);
sendto(fd,data,data_len,0,(struct sockaddr *)&sin4,sizeof(sin4));
return 0;
case ANODE_NETWORK_ADDRESS_IPV6:
Anode_zero(&sin6,sizeof(struct sockaddr_in6));
sin6.sin6_family = AF_INET6;
sin6.sin6_port = htons((uint16_t)to_endpoint->port);
Anode_memcpy(sin6.sin6_addr.s6_addr,to_endpoint->address.bits,16);
sendto(fd,data,data_len,0,(struct sockaddr *)&sin6,sizeof(sin6));
return 0;
default:
if (THIS->base)
return THIS->base->datagram_send(THIS->base,sock,data,data_len,to_endpoint);
else return ANODE_ERR_ADDRESS_TYPE_NOT_SUPPORTED;
}
}
static AnodeSocket *AnodeSystemTransport_stream_connect(AnodeTransport *transport,
const AnodeNetworkEndpoint *to_endpoint,
int *error_code)
{
struct sockaddr_in sin4;
struct sockaddr_in6 sin6;
struct AnodeSystemTransport_AnodeSocket *sock;
unsigned int entry_idx;
int fd;
switch(to_endpoint->address.type) {
case ANODE_NETWORK_ADDRESS_IPV4:
Anode_zero(&sin4,sizeof(struct sockaddr_in));
sin4.sin_family = AF_INET;
sin4.sin_port = htons(to_endpoint->port);
sin4.sin_addr.s_addr = *((uint32_t *)to_endpoint->address.bits);
fd = socket(AF_INET,SOCK_STREAM,0);
if (fd < 0) {
*error_code = ANODE_ERR_ADDRESS_TYPE_NOT_SUPPORTED;
return (AnodeSocket *)0;
}
fcntl(fd,F_SETFL,O_NONBLOCK);
if (connect(fd,(struct sockaddr *)&sin4,sizeof(sin4))) {
if (errno != EINPROGRESS) {
*error_code = ANODE_ERR_CONNECT_FAILED;
AnodeSystemTransport__close_socket(fd);
return (AnodeSocket *)0;
}
}
break;
case ANODE_NETWORK_ADDRESS_IPV6:
Anode_zero(&sin6,sizeof(struct sockaddr_in6));
sin6.sin6_family = AF_INET6;
sin6.sin6_port = htons(to_endpoint->port);
Anode_memcpy(sin6.sin6_addr.s6_addr,to_endpoint->address.bits,16);
fd = socket(AF_INET6,SOCK_STREAM,0);
if (fd < 0) {
*error_code = ANODE_ERR_ADDRESS_TYPE_NOT_SUPPORTED;
return (AnodeSocket *)0;
}
fcntl(fd,F_SETFL,O_NONBLOCK);
if (connect(fd,(struct sockaddr *)&sin6,sizeof(sin6))) {
if (errno == EINPROGRESS) {
*error_code = ANODE_ERR_CONNECT_FAILED;
AnodeSystemTransport__close_socket(fd);
return (AnodeSocket *)0;
}
}
break;
default:
if (THIS->base)
return THIS->base->stream_connect(THIS->base,to_endpoint,error_code);
else {
*error_code = ANODE_ERR_ADDRESS_TYPE_NOT_SUPPORTED;
return (AnodeSocket *)0;
}
}
entry_idx = AnodeSystemTransport__add_entry(THIS);
sock = &(THIS->sockets[entry_idx]);
sock->base.type = ANODE_SOCKET_STREAM_CONNECTION;
sock->base.state = ANODE_SOCKET_CONNECTING;
Anode_memcpy(&sock->base.endpoint,to_endpoint,sizeof(AnodeNetworkEndpoint));
sock->base.class_name = AnodeSystemTransport_CLASS;
sock->base.user_ptr[0] = NULL;
sock->base.user_ptr[1] = NULL;
sock->base.event_handler = NULL;
sock->entry_idx = entry_idx;
THIS->fds[entry_idx].fd = fd;
THIS->fds[entry_idx].events = POLLIN|POLLOUT;
THIS->fds[entry_idx].revents = 0;
return (AnodeSocket *)sock;
}
static void AnodeSystemTransport_stream_start_writing(AnodeTransport *transport,
AnodeSocket *sock)
{
if ((sock->type == ANODE_SOCKET_STREAM_CONNECTION)&&(((struct AnodeSystemTransport_AnodeSocket *)sock)->base.state == ANODE_SOCKET_OPEN)) {
if (sock->class_name == AnodeSystemTransport_CLASS) {
#ifdef ANODE_USE_SELECT
FD_SET((int)(THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx]),&THIS->writefds);
#else
THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx].events = (POLLIN|POLLOUT);
#endif
} else THIS->base->stream_start_writing(THIS->base,sock);
}
}
static void AnodeSystemTransport_stream_stop_writing(AnodeTransport *transport,
AnodeSocket *sock)
{
if ((sock->type == ANODE_SOCKET_STREAM_CONNECTION)&&(((struct AnodeSystemTransport_AnodeSocket *)sock)->base.state == ANODE_SOCKET_OPEN)) {
if (sock->class_name == AnodeSystemTransport_CLASS) {
#ifdef ANODE_USE_SELECT
FD_CLR((int)(THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx]),&THIS->writefds);
#else
THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx].events = POLLIN;
#endif
} else THIS->base->stream_stop_writing(THIS->base,sock);
}
}
static int AnodeSystemTransport_stream_send(AnodeTransport *transport,
AnodeSocket *sock,
const void *data,
int data_len)
{
int result;
if (sock->type == ANODE_SOCKET_STREAM_CONNECTION) {
if (sock->class_name == AnodeSystemTransport_CLASS) {
if (((struct AnodeSystemTransport_AnodeSocket *)sock)->base.state != ANODE_SOCKET_OPEN)
return ANODE_ERR_CONNECTION_CLOSED;
#ifdef ANODE_USE_SELECT
result = send((int)(THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx]),data,data_len,0);
#else
result = send(THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx].fd,data,data_len,0);
#endif
if (result >= 0)
return result;
else {
AnodeSystemTransport__do_close(THIS,(struct AnodeSystemTransport_AnodeSocket *)sock,ANODE_ERR_CONNECTION_CLOSED_BY_REMOTE,1);
return ANODE_ERR_CONNECTION_CLOSED;
}
} else return THIS->base->stream_send(THIS->base,sock,data,data_len);
} else return ANODE_ERR_INVALID_ARGUMENT;
}
static void AnodeSystemTransport_close(AnodeTransport *transport,
AnodeSocket *sock)
{
AnodeSystemTransport__do_close(THIS,(struct AnodeSystemTransport_AnodeSocket *)sock,0,1);
}
static void AnodeSystemTransport__poll_do_read_datagram(struct AnodeSystemTransport *transport,int fd,struct AnodeSystemTransport_AnodeSocket *sock)
{
char buf[16384];
struct sockaddr_storage fromaddr;
AnodeNetworkEndpoint tmp_ep;
AnodeEvent evbuf;
socklen_t addrlen;
int n;
addrlen = sizeof(struct sockaddr_storage);
n = recvfrom(fd,buf,sizeof(buf),0,(struct sockaddr *)&fromaddr,&addrlen);
if ((n >= 0)&&(AnodeSystemTransport__populate_network_endpoint(&fromaddr,&tmp_ep))) {
evbuf.type = ANODE_TRANSPORT_EVENT_DATAGRAM_RECEIVED;
evbuf.transport = (AnodeTransport *)transport;
evbuf.sock = (AnodeSocket *)sock;
evbuf.datagram_from = &tmp_ep;
evbuf.dns_name = NULL;
evbuf.dns_addresses = NULL;
evbuf.dns_address_count = 0;
evbuf.error_code = 0;
evbuf.data_length = n;
evbuf.data = buf;
if (sock->base.event_handler)
sock->base.event_handler(&evbuf);
else if (transport->default_event_handler)
transport->default_event_handler(&evbuf);
}
}
static void AnodeSystemTransport__poll_do_accept_incoming_connection(struct AnodeSystemTransport *transport,int fd,struct AnodeSystemTransport_AnodeSocket *sock)
{
struct sockaddr_storage fromaddr;
AnodeNetworkEndpoint tmp_ep;
AnodeEvent evbuf;
struct AnodeSystemTransport_AnodeSocket *newsock;
socklen_t addrlen;
int n;
unsigned int entry_idx;
addrlen = sizeof(struct sockaddr_storage);
n = accept(fd,(struct sockaddr *)&fromaddr,&addrlen);
if ((n >= 0)&&(AnodeSystemTransport__populate_network_endpoint(&fromaddr,&tmp_ep))) {
entry_idx = AnodeSystemTransport__add_entry(transport);
newsock = &(transport->sockets[entry_idx]);
newsock->base.type = ANODE_SOCKET_STREAM_CONNECTION;
newsock->base.state = ANODE_SOCKET_OPEN;
Anode_memcpy(&newsock->base.endpoint,&tmp_ep,sizeof(AnodeNetworkEndpoint));
newsock->base.class_name = AnodeSystemTransport_CLASS;
newsock->base.user_ptr[0] = NULL;
newsock->base.user_ptr[1] = NULL;
newsock->base.event_handler = NULL;
newsock->entry_idx = entry_idx;
THIS->fds[entry_idx].fd = n;
THIS->fds[entry_idx].events = POLLIN;
THIS->fds[entry_idx].revents = 0;
evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_INCOMING_CONNECT;
evbuf.transport = (AnodeTransport *)transport;
evbuf.sock = (AnodeSocket *)newsock;
evbuf.datagram_from = NULL;
evbuf.dns_name = NULL;
evbuf.dns_addresses = NULL;
evbuf.dns_address_count = 0;
evbuf.error_code = 0;
evbuf.data_length = 0;
evbuf.data = NULL;
if (sock->base.event_handler)
sock->base.event_handler(&evbuf);
else if (transport->default_event_handler)
transport->default_event_handler(&evbuf);
}
}
static void AnodeSystemTransport__poll_do_read_stream(struct AnodeSystemTransport *transport,int fd,struct AnodeSystemTransport_AnodeSocket *sock)
{
char buf[65536];
AnodeEvent evbuf;
int n;
n = recv(fd,buf,sizeof(buf),0);
if (n > 0) {
evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_DATA_RECEIVED;
evbuf.transport = (AnodeTransport *)transport;
evbuf.sock = (AnodeSocket *)sock;
evbuf.datagram_from = NULL;
evbuf.dns_name = NULL;
evbuf.dns_addresses = NULL;
evbuf.dns_address_count = 0;
evbuf.error_code = 0;
evbuf.data_length = n;
evbuf.data = buf;
if (sock->base.event_handler)
sock->base.event_handler(&evbuf);
else if (transport->default_event_handler)
transport->default_event_handler(&evbuf);
} else AnodeSystemTransport__do_close(transport,sock,ANODE_ERR_CONNECTION_CLOSED_BY_REMOTE,1);
}
static void AnodeSystemTransport__poll_do_stream_available_for_write(struct AnodeSystemTransport *transport,int fd,struct AnodeSystemTransport_AnodeSocket *sock)
{
AnodeEvent evbuf;
evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_DATA_RECEIVED;
evbuf.transport = (AnodeTransport *)transport;
evbuf.sock = (AnodeSocket *)sock;
evbuf.datagram_from = NULL;
evbuf.dns_name = NULL;
evbuf.dns_addresses = NULL;
evbuf.dns_address_count = 0;
evbuf.error_code = 0;
evbuf.data_length = 0;
evbuf.data = NULL;
if (sock->base.event_handler)
sock->base.event_handler(&evbuf);
else if (transport->default_event_handler)
transport->default_event_handler(&evbuf);
}
static void AnodeSystemTransport__poll_do_outgoing_connect(struct AnodeSystemTransport *transport,int fd,struct AnodeSystemTransport_AnodeSocket *sock)
{
AnodeEvent evbuf;
int err_code;
socklen_t optlen;
optlen = sizeof(err_code);
if (getsockopt(fd,SOL_SOCKET,SO_ERROR,(void *)&err_code,&optlen)) {
/* Error getting result, so we assume a failure */
evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_OUTGOING_CONNECT_FAILED;
evbuf.transport = (AnodeTransport *)transport;
evbuf.sock = (AnodeSocket *)sock;
evbuf.datagram_from = NULL;
evbuf.dns_name = NULL;
evbuf.dns_addresses = NULL;
evbuf.dns_address_count = 0;
evbuf.error_code = ANODE_ERR_CONNECT_FAILED;
evbuf.data_length = 0;
evbuf.data = NULL;
AnodeSystemTransport__do_close(transport,sock,0,0);
} else if (err_code) {
/* Error code is nonzero, so connect failed */
evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_OUTGOING_CONNECT_FAILED;
evbuf.transport = (AnodeTransport *)transport;
evbuf.sock = (AnodeSocket *)sock;
evbuf.datagram_from = NULL;
evbuf.dns_name = NULL;
evbuf.dns_addresses = NULL;
evbuf.dns_address_count = 0;
evbuf.error_code = ANODE_ERR_CONNECT_FAILED;
evbuf.data_length = 0;
evbuf.data = NULL;
AnodeSystemTransport__do_close(transport,sock,0,0);
} else {
/* Connect succeeded */
evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_OUTGOING_CONNECT_ESTABLISHED;
evbuf.transport = (AnodeTransport *)transport;
evbuf.sock = (AnodeSocket *)sock;
evbuf.datagram_from = NULL;
evbuf.dns_name = NULL;
evbuf.dns_addresses = NULL;
evbuf.dns_address_count = 0;
evbuf.error_code = 0;
evbuf.data_length = 0;
evbuf.data = NULL;
}
if (sock->base.event_handler)
sock->base.event_handler(&evbuf);
else if (transport->default_event_handler)
transport->default_event_handler(&evbuf);
}
static int AnodeSystemTransport_poll(AnodeTransport *transport)
{
int timeout = -1;
unsigned int fd_idx;
int event_count = 0;
int n;
if (poll((struct pollfd *)THIS->fds,THIS->fd_count,timeout) > 0) {
for(fd_idx=0;fd_idx<THIS->fd_count;++fd_idx) {
if ((THIS->fds[fd_idx].revents & (POLLERR|POLLHUP|POLLNVAL))) {
if (THIS->sockets[fd_idx].base.type == ANODE_SOCKET_STREAM_CONNECTION) {
if (THIS->sockets[fd_idx].base.state == ANODE_SOCKET_CONNECTING)
AnodeSystemTransport__poll_do_outgoing_connect(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]);
else AnodeSystemTransport__do_close(THIS,&THIS->sockets[fd_idx],ANODE_ERR_CONNECTION_CLOSED_BY_REMOTE,1);
++event_count;
}
} else {
if ((THIS->fds[fd_idx].revents & POLLIN)) {
if (THIS->fds[fd_idx].fd == THIS->invoke_pipe[0]) {
n = read(THIS->invoke_pipe[0],&(((unsigned char *)(&(THIS->invoke_pipe_buf)))[THIS->invoke_pipe_buf_ptr]),sizeof(THIS->invoke_pipe_buf) - THIS->invoke_pipe_buf_ptr);
if (n > 0) {
THIS->invoke_pipe_buf_ptr += (unsigned int)n;
if (THIS->invoke_pipe_buf_ptr >= sizeof(THIS->invoke_pipe_buf)) {
THIS->invoke_pipe_buf_ptr -= sizeof(THIS->invoke_pipe_buf);
((void (*)(void *))(THIS->invoke_pipe_buf[1]))(THIS->invoke_pipe_buf[0]);
}
}
} else {
switch(THIS->sockets[fd_idx].base.type) {
case ANODE_SOCKET_DATAGRAM:
AnodeSystemTransport__poll_do_read_datagram(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]);
break;
case ANODE_SOCKET_STREAM_LISTEN:
AnodeSystemTransport__poll_do_accept_incoming_connection(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]);
break;
case ANODE_SOCKET_STREAM_CONNECTION:
if (THIS->sockets[fd_idx].base.state == ANODE_SOCKET_CONNECTING)
AnodeSystemTransport__poll_do_outgoing_connect(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]);
else AnodeSystemTransport__poll_do_read_stream(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]);
break;
}
++event_count;
}
}
if ((THIS->fds[fd_idx].revents & POLLOUT)) {
if (THIS->sockets[fd_idx].base.state == ANODE_SOCKET_CONNECTING)
AnodeSystemTransport__poll_do_outgoing_connect(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]);
else AnodeSystemTransport__poll_do_stream_available_for_write(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]);
++event_count;
}
}
}
}
return event_count;
}
static int AnodeSystemTransport_supports_address_type(const AnodeTransport *transport,
enum AnodeNetworkAddressType at)
{
switch(at) {
case ANODE_NETWORK_ADDRESS_IPV4:
return 1;
case ANODE_NETWORK_ADDRESS_IPV6:
return 1;
default:
if (THIS->base)
return THIS->base->supports_address_type(THIS->base,at);
return 0;
}
}
static AnodeTransport *AnodeSystemTransport_base_instance(const AnodeTransport *transport)
{
return THIS->base;
}
static const char *AnodeSystemTransport_class_name(AnodeTransport *transport)
{
return AnodeSystemTransport_CLASS;
}
static void AnodeSystemTransport_delete(AnodeTransport *transport)
{
close(THIS->invoke_pipe[0]);
close(THIS->invoke_pipe[1]);
AnodeMutex_destroy(&THIS->invoke_pipe_m);
if (THIS->fds) free(THIS->fds);
if (THIS->sockets) free(THIS->sockets);
if (THIS->base) THIS->base->delete(THIS->base);
free(transport);
}
/* ======================================================================== */
AnodeTransport *AnodeSystemTransport_new(AnodeTransport *base)
{
struct AnodeSystemTransport *t;
unsigned int entry_idx;
t = malloc(sizeof(struct AnodeSystemTransport));
if (!t) return (AnodeTransport *)0;
Anode_zero(t,sizeof(struct AnodeSystemTransport));
t->interface.invoke = &AnodeSystemTransport_invoke;
t->interface.dns_resolve = &AnodeSystemTransport_dns_resolve;
t->interface.datagram_listen = &AnodeSystemTransport_datagram_listen;
t->interface.stream_listen = &AnodeSystemTransport_stream_listen;
t->interface.datagram_send = &AnodeSystemTransport_datagram_send;
t->interface.stream_connect = &AnodeSystemTransport_stream_connect;
t->interface.stream_start_writing = &AnodeSystemTransport_stream_start_writing;
t->interface.stream_stop_writing = &AnodeSystemTransport_stream_stop_writing;
t->interface.stream_send = &AnodeSystemTransport_stream_send;
t->interface.close = &AnodeSystemTransport_close;
t->interface.poll = &AnodeSystemTransport_poll;
t->interface.supports_address_type = &AnodeSystemTransport_supports_address_type;
t->interface.base_instance = &AnodeSystemTransport_base_instance;
t->interface.class_name = &AnodeSystemTransport_class_name;
t->interface.delete = &AnodeSystemTransport_delete;
t->base = base;
pipe(t->invoke_pipe);
fcntl(t->invoke_pipe[0],F_SETFL,O_NONBLOCK);
entry_idx = AnodeSystemTransport__add_entry(t);
t->fds[entry_idx].fd = t->invoke_pipe[0];
t->fds[entry_idx].events = POLLIN;
t->fds[entry_idx].revents = 0;
AnodeMutex_init(&t->invoke_pipe_m);
return (AnodeTransport *)t;
}