/* libanode: the Anode C reference implementation * Copyright (C) 2009-2010 Adam Ierymenko * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include #include #include #include #include #include #include #include "anode.h" #include "impl/mutex.h" #include "impl/thread.h" #include "impl/misc.h" #include "impl/dns_txt.h" #ifdef WINDOWS #include #include #define AnodeSystemTransport__close_socket(s) closesocket((s)) #define ANODE_USE_SELECT 1 #else #include #include #define AnodeSystemTransport__close_socket(s) close((s)) #endif static const char *AnodeSystemTransport_CLASS = "SystemTransport"; /* ======================================================================== */ struct AnodeSystemTransport; struct AnodeSystemTransport_AnodeSocket { AnodeSocket base; /* must be first */ unsigned int entry_idx; }; #define ANODE_SYSTEM_TRANSPORT_DNS_MAX_RESULTS 16 struct AnodeSystemTransport__dns_request { struct AnodeSystemTransport__dns_request *next; AnodeThread *thread; struct AnodeSystemTransport *owner; void (*event_handler)(const AnodeEvent *event); char name[256]; enum AnodeTransportDnsIncludeMode ipv4_include_mode; enum AnodeTransportDnsIncludeMode ipv6_include_mode; enum AnodeTransportDnsIncludeMode anode_include_mode; AnodeNetworkAddress addresses[ANODE_SYSTEM_TRANSPORT_DNS_MAX_RESULTS]; unsigned int address_count; int error_code; }; #ifdef ANODE_USE_SELECT typedef int AnodeSystemTransport__poll_fd; /* for select() */ #else typedef struct pollfd AnodeSystemTransport__poll_fd; /* for poll() */ #endif struct AnodeSystemTransport { AnodeTransport interface; /* must be first */ AnodeTransport *base; #ifdef ANODE_USE_SELECT FD_SET readfds; FD_SET writefds; #endif void (*default_event_handler)(const AnodeEvent *event); AnodeSystemTransport__poll_fd *fds; struct AnodeSystemTransport_AnodeSocket *sockets; unsigned int fd_count; unsigned int fd_capacity; struct AnodeSystemTransport__dns_request *pending_dns_requests; int invoke_pipe[2]; AnodeMutex invoke_pipe_m; void *invoke_pipe_buf[2]; unsigned int invoke_pipe_buf_ptr; }; /* ======================================================================== */ /* Internal helper methods */ static unsigned int AnodeSystemTransport__add_entry(struct AnodeSystemTransport *transport) { if ((transport->fd_count + 1) > transport->fd_capacity) { transport->fd_capacity += 8; transport->fds = realloc(transport->fds,sizeof(AnodeSystemTransport__poll_fd) * transport->fd_capacity); transport->sockets = realloc(transport->sockets,sizeof(struct AnodeSystemTransport_AnodeSocket) * transport->fd_capacity); } return transport->fd_count++; } static void AnodeSystemTransport__remove_entry(struct AnodeSystemTransport *transport,const unsigned int idx) { unsigned int i; --transport->fd_count; for(i=idx;ifd_count;++i) { Anode_memcpy(&transport->fds[i],&transport->fds[i+1],sizeof(AnodeSystemTransport__poll_fd)); Anode_memcpy(&transport->sockets[i],&transport->sockets[i+1],sizeof(struct AnodeSystemTransport_AnodeSocket)); } if ((transport->fd_capacity - transport->fd_count) > 16) { transport->fd_capacity -= 16; transport->fds = realloc(transport->fds,sizeof(AnodeSystemTransport__poll_fd) * transport->fd_capacity); transport->sockets = realloc(transport->sockets,sizeof(struct AnodeSystemTransport_AnodeSocket) * transport->fd_capacity); } } static void AnodeSystemTransport__dns_invoke_on_completion(void *_dreq) { struct AnodeSystemTransport__dns_request *dreq = (struct AnodeSystemTransport__dns_request *)_dreq; struct AnodeSystemTransport__dns_request *ptr,**lastnext; AnodeThread_join(dreq->thread); ptr = dreq->owner->pending_dns_requests; lastnext = &dreq->owner->pending_dns_requests; while (ptr) { if (ptr == dreq) { *lastnext = ptr->next; break; } else { lastnext = &ptr->next; ptr = ptr->next; } } free(dreq); } static void AnodeSystemTransport__dns_thread_main(void *_dreq) { struct AnodeSystemTransport__dns_request *dreq = (struct AnodeSystemTransport__dns_request *)_dreq; dreq->owner->interface.invoke((AnodeTransport *)dreq->owner,dreq,&AnodeSystemTransport__dns_invoke_on_completion); } static void AnodeSystemTransport__do_close(struct AnodeSystemTransport *transport,struct AnodeSystemTransport_AnodeSocket *sock,const int error_code,const int generate_event) { AnodeEvent evbuf; int fd; if (sock->base.class_name == AnodeSystemTransport_CLASS) { #ifdef ANODE_USE_SELECT fd = (int)(transport->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx]); #else fd = transport->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx].fd; #endif if ((sock->base.type == ANODE_SOCKET_STREAM_CONNECTION)&&(sock->base.state != ANODE_SOCKET_CLOSED)) { sock->base.state = ANODE_SOCKET_CLOSED; if (generate_event) { evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_CLOSED; evbuf.transport = (AnodeTransport *)transport; evbuf.sock = (AnodeSocket *)sock; evbuf.datagram_from = NULL; evbuf.dns_name = NULL; evbuf.dns_addresses = NULL; evbuf.dns_address_count = 0; evbuf.error_code = error_code; evbuf.data_length = 0; evbuf.data = NULL; if (sock->base.event_handler) sock->base.event_handler(&evbuf); else if (transport->default_event_handler) transport->default_event_handler(&evbuf); } } AnodeSystemTransport__close_socket(fd); AnodeSystemTransport__remove_entry(transport,((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx); #ifdef ANODE_USE_SELECT FD_CLR(sock,&THIS->readfds); FD_CLR(sock,&THIS->writefds); #endif } else transport->base->close(transport->base,(AnodeSocket *)sock); } static int AnodeSystemTransport__populate_network_endpoint(const struct sockaddr_storage *saddr,AnodeNetworkEndpoint *ep) { switch(saddr->ss_family) { case AF_INET: ep->address.type = ANODE_NETWORK_ADDRESS_IPV4; *((uint32_t *)ep->address.bits) = ((struct sockaddr_in *)saddr)->sin_addr.s_addr; ep->port = ntohs(((struct sockaddr_in *)saddr)->sin_port); return 1; case AF_INET6: ep->address.type = ANODE_NETWORK_ADDRESS_IPV6; Anode_memcpy(ep->address.bits,((struct sockaddr_in6 *)saddr)->sin6_addr.s6_addr,16); ep->port = ntohs(((struct sockaddr_in6 *)saddr)->sin6_port); return 1; } return 0; } /* ======================================================================== */ #ifdef THIS #undef THIS #endif #define THIS ((struct AnodeSystemTransport *)transport) static void AnodeSystemTransport_invoke(AnodeTransport *transport, void *ptr, void (*func)(void *)) { void *invoke_msg[2]; invoke_msg[0] = ptr; invoke_msg[1] = (void *)func; AnodeMutex_lock(&THIS->invoke_pipe_m); write(THIS->invoke_pipe[1],(void *)(&invoke_msg),sizeof(invoke_msg)); AnodeMutex_unlock(&THIS->invoke_pipe_m); } static void AnodeSystemTransport_dns_resolve(AnodeTransport *transport, const char *name, void (*event_handler)(const AnodeEvent *), enum AnodeTransportDnsIncludeMode ipv4_include_mode, enum AnodeTransportDnsIncludeMode ipv6_include_mode, enum AnodeTransportDnsIncludeMode anode_include_mode) { struct AnodeSystemTransport__dns_request *dreq = malloc(sizeof(struct AnodeSystemTransport__dns_request)); dreq->owner = THIS; dreq->event_handler = event_handler; Anode_str_copy(dreq->name,name,sizeof(dreq->name)); dreq->ipv4_include_mode = ipv4_include_mode; dreq->ipv6_include_mode = ipv6_include_mode; dreq->anode_include_mode = anode_include_mode; dreq->address_count = 0; dreq->error_code = 0; dreq->next = THIS->pending_dns_requests; THIS->pending_dns_requests = dreq; dreq->thread = AnodeThread_create(&AnodeSystemTransport__dns_thread_main,dreq,0); } static AnodeSocket *AnodeSystemTransport_datagram_listen(AnodeTransport *transport, const AnodeNetworkAddress *local_address, int local_port, int *error_code) { struct sockaddr_in sin4; struct sockaddr_in6 sin6; struct AnodeSystemTransport_AnodeSocket *sock; unsigned int entry_idx; int fd; int tmp; switch(local_address->type) { case ANODE_NETWORK_ADDRESS_IPV4: fd = socket(AF_INET,SOCK_DGRAM,0); if (fd <= 0) { *error_code = ANODE_ERR_UNABLE_TO_BIND; return (AnodeSocket *)0; } tmp = 1; setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&tmp,sizeof(tmp)); fcntl(fd,F_SETFL,O_NONBLOCK); Anode_zero(&sin4,sizeof(struct sockaddr_in)); sin4.sin_family = AF_INET; sin4.sin_port = htons(local_port); sin4.sin_addr.s_addr = *((uint32_t *)local_address->bits); if (bind(fd,(const struct sockaddr *)&sin4,sizeof(sin4))) { AnodeSystemTransport__close_socket(fd); *error_code = ANODE_ERR_UNABLE_TO_BIND; return (AnodeSocket *)0; } break; case ANODE_NETWORK_ADDRESS_IPV6: fd = socket(AF_INET6,SOCK_DGRAM,0); if (fd <= 0) { *error_code = ANODE_ERR_UNABLE_TO_BIND; return (AnodeSocket *)0; } tmp = 1; setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&tmp,sizeof(tmp)); fcntl(fd,F_SETFL,O_NONBLOCK); #ifdef IPV6_V6ONLY tmp = 1; setsockopt(fd,IPPROTO_IPV6,IPV6_V6ONLY,&tmp,sizeof(tmp)); #endif Anode_zero(&sin6,sizeof(struct sockaddr_in6)); sin6.sin6_family = AF_INET6; sin6.sin6_port = htons(local_port); Anode_memcpy(sin6.sin6_addr.s6_addr,local_address->bits,16); if (bind(fd,(const struct sockaddr *)&sin6,sizeof(sin6))) { AnodeSystemTransport__close_socket(fd); *error_code = ANODE_ERR_UNABLE_TO_BIND; return (AnodeSocket *)0; } break; default: if (THIS->base) return THIS->base->datagram_listen(THIS->base,local_address,local_port,error_code); else { *error_code = ANODE_ERR_ADDRESS_TYPE_NOT_SUPPORTED; return (AnodeSocket *)0; } } entry_idx = AnodeSystemTransport__add_entry(THIS); sock = &(THIS->sockets[entry_idx]); sock->base.type = ANODE_SOCKET_DATAGRAM; sock->base.state = ANODE_SOCKET_OPEN; Anode_memcpy(&sock->base.endpoint.address,local_address,sizeof(AnodeNetworkAddress)); sock->base.endpoint.port = local_port; sock->base.class_name = AnodeSystemTransport_CLASS; sock->base.user_ptr[0] = NULL; sock->base.user_ptr[1] = NULL; sock->base.event_handler = NULL; sock->entry_idx = entry_idx; THIS->fds[entry_idx].fd = fd; THIS->fds[entry_idx].events = POLLIN; THIS->fds[entry_idx].revents = 0; *error_code = 0; return (AnodeSocket *)sock; } static AnodeSocket *AnodeSystemTransport_stream_listen(AnodeTransport *transport, const AnodeNetworkAddress *local_address, int local_port, int *error_code) { struct sockaddr_in sin4; struct sockaddr_in6 sin6; struct AnodeSystemTransport_AnodeSocket *sock; unsigned int entry_idx; int fd; int tmp; switch(local_address->type) { case ANODE_NETWORK_ADDRESS_IPV4: fd = socket(AF_INET,SOCK_STREAM,0); if (fd < 0) { *error_code = ANODE_ERR_UNABLE_TO_BIND; return (AnodeSocket *)0; } fcntl(fd,F_SETFL,O_NONBLOCK); Anode_zero(&sin4,sizeof(struct sockaddr_in)); sin4.sin_family = AF_INET; sin4.sin_port = htons(local_port); sin4.sin_addr.s_addr = *((uint32_t *)local_address->bits); if (bind(fd,(const struct sockaddr *)&sin4,sizeof(sin4))) { AnodeSystemTransport__close_socket(fd); *error_code = ANODE_ERR_UNABLE_TO_BIND; return (AnodeSocket *)0; } if (listen(fd,8)) { AnodeSystemTransport__close_socket(fd); *error_code = ANODE_ERR_UNABLE_TO_BIND; return (AnodeSocket *)0; } break; case ANODE_NETWORK_ADDRESS_IPV6: fd = socket(AF_INET6,SOCK_STREAM,0); if (fd < 0) { *error_code = ANODE_ERR_UNABLE_TO_BIND; return (AnodeSocket *)0; } fcntl(fd,F_SETFL,O_NONBLOCK); #ifdef IPV6_V6ONLY tmp = 1; setsockopt(fd,IPPROTO_IPV6,IPV6_V6ONLY,&tmp,sizeof(tmp)); #endif Anode_zero(&sin6,sizeof(struct sockaddr_in6)); sin6.sin6_family = AF_INET6; sin6.sin6_port = htons(local_port); Anode_memcpy(sin6.sin6_addr.s6_addr,local_address->bits,16); if (bind(fd,(const struct sockaddr *)&sin6,sizeof(sin6))) { AnodeSystemTransport__close_socket(fd); *error_code = ANODE_ERR_UNABLE_TO_BIND; return (AnodeSocket *)0; } if (listen(fd,8)) { AnodeSystemTransport__close_socket(fd); *error_code = ANODE_ERR_UNABLE_TO_BIND; return (AnodeSocket *)0; } break; default: if (THIS->base) return THIS->base->stream_listen(THIS->base,local_address,local_port,error_code); else { *error_code = ANODE_ERR_ADDRESS_TYPE_NOT_SUPPORTED; return (AnodeSocket *)0; } } entry_idx = AnodeSystemTransport__add_entry(THIS); sock = &(THIS->sockets[entry_idx]); sock->base.type = ANODE_SOCKET_STREAM_LISTEN; sock->base.state = ANODE_SOCKET_OPEN; Anode_memcpy(&sock->base.endpoint.address,local_address,sizeof(AnodeNetworkAddress)); sock->base.endpoint.port = local_port; sock->base.class_name = AnodeSystemTransport_CLASS; sock->base.user_ptr[0] = NULL; sock->base.user_ptr[1] = NULL; sock->base.event_handler = NULL; sock->entry_idx = entry_idx; THIS->fds[entry_idx].fd = fd; THIS->fds[entry_idx].events = POLLIN; THIS->fds[entry_idx].revents = 0; *error_code = 0; return (AnodeSocket *)sock; } static int AnodeSystemTransport_datagram_send(AnodeTransport *transport, AnodeSocket *sock, const void *data, int data_len, const AnodeNetworkEndpoint *to_endpoint) { struct sockaddr_in sin4; struct sockaddr_in6 sin6; #ifdef ANODE_USE_SELECT const int fd = (int)(THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx]); #else const int fd = THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx].fd; #endif switch(to_endpoint->address.type) { case ANODE_NETWORK_ADDRESS_IPV4: Anode_zero(&sin4,sizeof(struct sockaddr_in)); sin4.sin_family = AF_INET; sin4.sin_port = htons((uint16_t)to_endpoint->port); sin4.sin_addr.s_addr = *((uint32_t *)to_endpoint->address.bits); sendto(fd,data,data_len,0,(struct sockaddr *)&sin4,sizeof(sin4)); return 0; case ANODE_NETWORK_ADDRESS_IPV6: Anode_zero(&sin6,sizeof(struct sockaddr_in6)); sin6.sin6_family = AF_INET6; sin6.sin6_port = htons((uint16_t)to_endpoint->port); Anode_memcpy(sin6.sin6_addr.s6_addr,to_endpoint->address.bits,16); sendto(fd,data,data_len,0,(struct sockaddr *)&sin6,sizeof(sin6)); return 0; default: if (THIS->base) return THIS->base->datagram_send(THIS->base,sock,data,data_len,to_endpoint); else return ANODE_ERR_ADDRESS_TYPE_NOT_SUPPORTED; } } static AnodeSocket *AnodeSystemTransport_stream_connect(AnodeTransport *transport, const AnodeNetworkEndpoint *to_endpoint, int *error_code) { struct sockaddr_in sin4; struct sockaddr_in6 sin6; struct AnodeSystemTransport_AnodeSocket *sock; unsigned int entry_idx; int fd; switch(to_endpoint->address.type) { case ANODE_NETWORK_ADDRESS_IPV4: Anode_zero(&sin4,sizeof(struct sockaddr_in)); sin4.sin_family = AF_INET; sin4.sin_port = htons(to_endpoint->port); sin4.sin_addr.s_addr = *((uint32_t *)to_endpoint->address.bits); fd = socket(AF_INET,SOCK_STREAM,0); if (fd < 0) { *error_code = ANODE_ERR_ADDRESS_TYPE_NOT_SUPPORTED; return (AnodeSocket *)0; } fcntl(fd,F_SETFL,O_NONBLOCK); if (connect(fd,(struct sockaddr *)&sin4,sizeof(sin4))) { if (errno != EINPROGRESS) { *error_code = ANODE_ERR_CONNECT_FAILED; AnodeSystemTransport__close_socket(fd); return (AnodeSocket *)0; } } break; case ANODE_NETWORK_ADDRESS_IPV6: Anode_zero(&sin6,sizeof(struct sockaddr_in6)); sin6.sin6_family = AF_INET6; sin6.sin6_port = htons(to_endpoint->port); Anode_memcpy(sin6.sin6_addr.s6_addr,to_endpoint->address.bits,16); fd = socket(AF_INET6,SOCK_STREAM,0); if (fd < 0) { *error_code = ANODE_ERR_ADDRESS_TYPE_NOT_SUPPORTED; return (AnodeSocket *)0; } fcntl(fd,F_SETFL,O_NONBLOCK); if (connect(fd,(struct sockaddr *)&sin6,sizeof(sin6))) { if (errno == EINPROGRESS) { *error_code = ANODE_ERR_CONNECT_FAILED; AnodeSystemTransport__close_socket(fd); return (AnodeSocket *)0; } } break; default: if (THIS->base) return THIS->base->stream_connect(THIS->base,to_endpoint,error_code); else { *error_code = ANODE_ERR_ADDRESS_TYPE_NOT_SUPPORTED; return (AnodeSocket *)0; } } entry_idx = AnodeSystemTransport__add_entry(THIS); sock = &(THIS->sockets[entry_idx]); sock->base.type = ANODE_SOCKET_STREAM_CONNECTION; sock->base.state = ANODE_SOCKET_CONNECTING; Anode_memcpy(&sock->base.endpoint,to_endpoint,sizeof(AnodeNetworkEndpoint)); sock->base.class_name = AnodeSystemTransport_CLASS; sock->base.user_ptr[0] = NULL; sock->base.user_ptr[1] = NULL; sock->base.event_handler = NULL; sock->entry_idx = entry_idx; THIS->fds[entry_idx].fd = fd; THIS->fds[entry_idx].events = POLLIN|POLLOUT; THIS->fds[entry_idx].revents = 0; return (AnodeSocket *)sock; } static void AnodeSystemTransport_stream_start_writing(AnodeTransport *transport, AnodeSocket *sock) { if ((sock->type == ANODE_SOCKET_STREAM_CONNECTION)&&(((struct AnodeSystemTransport_AnodeSocket *)sock)->base.state == ANODE_SOCKET_OPEN)) { if (sock->class_name == AnodeSystemTransport_CLASS) { #ifdef ANODE_USE_SELECT FD_SET((int)(THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx]),&THIS->writefds); #else THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx].events = (POLLIN|POLLOUT); #endif } else THIS->base->stream_start_writing(THIS->base,sock); } } static void AnodeSystemTransport_stream_stop_writing(AnodeTransport *transport, AnodeSocket *sock) { if ((sock->type == ANODE_SOCKET_STREAM_CONNECTION)&&(((struct AnodeSystemTransport_AnodeSocket *)sock)->base.state == ANODE_SOCKET_OPEN)) { if (sock->class_name == AnodeSystemTransport_CLASS) { #ifdef ANODE_USE_SELECT FD_CLR((int)(THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx]),&THIS->writefds); #else THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx].events = POLLIN; #endif } else THIS->base->stream_stop_writing(THIS->base,sock); } } static int AnodeSystemTransport_stream_send(AnodeTransport *transport, AnodeSocket *sock, const void *data, int data_len) { int result; if (sock->type == ANODE_SOCKET_STREAM_CONNECTION) { if (sock->class_name == AnodeSystemTransport_CLASS) { if (((struct AnodeSystemTransport_AnodeSocket *)sock)->base.state != ANODE_SOCKET_OPEN) return ANODE_ERR_CONNECTION_CLOSED; #ifdef ANODE_USE_SELECT result = send((int)(THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx]),data,data_len,0); #else result = send(THIS->fds[((struct AnodeSystemTransport_AnodeSocket *)sock)->entry_idx].fd,data,data_len,0); #endif if (result >= 0) return result; else { AnodeSystemTransport__do_close(THIS,(struct AnodeSystemTransport_AnodeSocket *)sock,ANODE_ERR_CONNECTION_CLOSED_BY_REMOTE,1); return ANODE_ERR_CONNECTION_CLOSED; } } else return THIS->base->stream_send(THIS->base,sock,data,data_len); } else return ANODE_ERR_INVALID_ARGUMENT; } static void AnodeSystemTransport_close(AnodeTransport *transport, AnodeSocket *sock) { AnodeSystemTransport__do_close(THIS,(struct AnodeSystemTransport_AnodeSocket *)sock,0,1); } static void AnodeSystemTransport__poll_do_read_datagram(struct AnodeSystemTransport *transport,int fd,struct AnodeSystemTransport_AnodeSocket *sock) { char buf[16384]; struct sockaddr_storage fromaddr; AnodeNetworkEndpoint tmp_ep; AnodeEvent evbuf; socklen_t addrlen; int n; addrlen = sizeof(struct sockaddr_storage); n = recvfrom(fd,buf,sizeof(buf),0,(struct sockaddr *)&fromaddr,&addrlen); if ((n >= 0)&&(AnodeSystemTransport__populate_network_endpoint(&fromaddr,&tmp_ep))) { evbuf.type = ANODE_TRANSPORT_EVENT_DATAGRAM_RECEIVED; evbuf.transport = (AnodeTransport *)transport; evbuf.sock = (AnodeSocket *)sock; evbuf.datagram_from = &tmp_ep; evbuf.dns_name = NULL; evbuf.dns_addresses = NULL; evbuf.dns_address_count = 0; evbuf.error_code = 0; evbuf.data_length = n; evbuf.data = buf; if (sock->base.event_handler) sock->base.event_handler(&evbuf); else if (transport->default_event_handler) transport->default_event_handler(&evbuf); } } static void AnodeSystemTransport__poll_do_accept_incoming_connection(struct AnodeSystemTransport *transport,int fd,struct AnodeSystemTransport_AnodeSocket *sock) { struct sockaddr_storage fromaddr; AnodeNetworkEndpoint tmp_ep; AnodeEvent evbuf; struct AnodeSystemTransport_AnodeSocket *newsock; socklen_t addrlen; int n; unsigned int entry_idx; addrlen = sizeof(struct sockaddr_storage); n = accept(fd,(struct sockaddr *)&fromaddr,&addrlen); if ((n >= 0)&&(AnodeSystemTransport__populate_network_endpoint(&fromaddr,&tmp_ep))) { entry_idx = AnodeSystemTransport__add_entry(transport); newsock = &(transport->sockets[entry_idx]); newsock->base.type = ANODE_SOCKET_STREAM_CONNECTION; newsock->base.state = ANODE_SOCKET_OPEN; Anode_memcpy(&newsock->base.endpoint,&tmp_ep,sizeof(AnodeNetworkEndpoint)); newsock->base.class_name = AnodeSystemTransport_CLASS; newsock->base.user_ptr[0] = NULL; newsock->base.user_ptr[1] = NULL; newsock->base.event_handler = NULL; newsock->entry_idx = entry_idx; THIS->fds[entry_idx].fd = n; THIS->fds[entry_idx].events = POLLIN; THIS->fds[entry_idx].revents = 0; evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_INCOMING_CONNECT; evbuf.transport = (AnodeTransport *)transport; evbuf.sock = (AnodeSocket *)newsock; evbuf.datagram_from = NULL; evbuf.dns_name = NULL; evbuf.dns_addresses = NULL; evbuf.dns_address_count = 0; evbuf.error_code = 0; evbuf.data_length = 0; evbuf.data = NULL; if (sock->base.event_handler) sock->base.event_handler(&evbuf); else if (transport->default_event_handler) transport->default_event_handler(&evbuf); } } static void AnodeSystemTransport__poll_do_read_stream(struct AnodeSystemTransport *transport,int fd,struct AnodeSystemTransport_AnodeSocket *sock) { char buf[65536]; AnodeEvent evbuf; int n; n = recv(fd,buf,sizeof(buf),0); if (n > 0) { evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_DATA_RECEIVED; evbuf.transport = (AnodeTransport *)transport; evbuf.sock = (AnodeSocket *)sock; evbuf.datagram_from = NULL; evbuf.dns_name = NULL; evbuf.dns_addresses = NULL; evbuf.dns_address_count = 0; evbuf.error_code = 0; evbuf.data_length = n; evbuf.data = buf; if (sock->base.event_handler) sock->base.event_handler(&evbuf); else if (transport->default_event_handler) transport->default_event_handler(&evbuf); } else AnodeSystemTransport__do_close(transport,sock,ANODE_ERR_CONNECTION_CLOSED_BY_REMOTE,1); } static void AnodeSystemTransport__poll_do_stream_available_for_write(struct AnodeSystemTransport *transport,int fd,struct AnodeSystemTransport_AnodeSocket *sock) { AnodeEvent evbuf; evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_DATA_RECEIVED; evbuf.transport = (AnodeTransport *)transport; evbuf.sock = (AnodeSocket *)sock; evbuf.datagram_from = NULL; evbuf.dns_name = NULL; evbuf.dns_addresses = NULL; evbuf.dns_address_count = 0; evbuf.error_code = 0; evbuf.data_length = 0; evbuf.data = NULL; if (sock->base.event_handler) sock->base.event_handler(&evbuf); else if (transport->default_event_handler) transport->default_event_handler(&evbuf); } static void AnodeSystemTransport__poll_do_outgoing_connect(struct AnodeSystemTransport *transport,int fd,struct AnodeSystemTransport_AnodeSocket *sock) { AnodeEvent evbuf; int err_code; socklen_t optlen; optlen = sizeof(err_code); if (getsockopt(fd,SOL_SOCKET,SO_ERROR,(void *)&err_code,&optlen)) { /* Error getting result, so we assume a failure */ evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_OUTGOING_CONNECT_FAILED; evbuf.transport = (AnodeTransport *)transport; evbuf.sock = (AnodeSocket *)sock; evbuf.datagram_from = NULL; evbuf.dns_name = NULL; evbuf.dns_addresses = NULL; evbuf.dns_address_count = 0; evbuf.error_code = ANODE_ERR_CONNECT_FAILED; evbuf.data_length = 0; evbuf.data = NULL; AnodeSystemTransport__do_close(transport,sock,0,0); } else if (err_code) { /* Error code is nonzero, so connect failed */ evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_OUTGOING_CONNECT_FAILED; evbuf.transport = (AnodeTransport *)transport; evbuf.sock = (AnodeSocket *)sock; evbuf.datagram_from = NULL; evbuf.dns_name = NULL; evbuf.dns_addresses = NULL; evbuf.dns_address_count = 0; evbuf.error_code = ANODE_ERR_CONNECT_FAILED; evbuf.data_length = 0; evbuf.data = NULL; AnodeSystemTransport__do_close(transport,sock,0,0); } else { /* Connect succeeded */ evbuf.type = ANODE_TRANSPORT_EVENT_STREAM_OUTGOING_CONNECT_ESTABLISHED; evbuf.transport = (AnodeTransport *)transport; evbuf.sock = (AnodeSocket *)sock; evbuf.datagram_from = NULL; evbuf.dns_name = NULL; evbuf.dns_addresses = NULL; evbuf.dns_address_count = 0; evbuf.error_code = 0; evbuf.data_length = 0; evbuf.data = NULL; } if (sock->base.event_handler) sock->base.event_handler(&evbuf); else if (transport->default_event_handler) transport->default_event_handler(&evbuf); } static int AnodeSystemTransport_poll(AnodeTransport *transport) { int timeout = -1; unsigned int fd_idx; int event_count = 0; int n; if (poll((struct pollfd *)THIS->fds,THIS->fd_count,timeout) > 0) { for(fd_idx=0;fd_idxfd_count;++fd_idx) { if ((THIS->fds[fd_idx].revents & (POLLERR|POLLHUP|POLLNVAL))) { if (THIS->sockets[fd_idx].base.type == ANODE_SOCKET_STREAM_CONNECTION) { if (THIS->sockets[fd_idx].base.state == ANODE_SOCKET_CONNECTING) AnodeSystemTransport__poll_do_outgoing_connect(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]); else AnodeSystemTransport__do_close(THIS,&THIS->sockets[fd_idx],ANODE_ERR_CONNECTION_CLOSED_BY_REMOTE,1); ++event_count; } } else { if ((THIS->fds[fd_idx].revents & POLLIN)) { if (THIS->fds[fd_idx].fd == THIS->invoke_pipe[0]) { n = read(THIS->invoke_pipe[0],&(((unsigned char *)(&(THIS->invoke_pipe_buf)))[THIS->invoke_pipe_buf_ptr]),sizeof(THIS->invoke_pipe_buf) - THIS->invoke_pipe_buf_ptr); if (n > 0) { THIS->invoke_pipe_buf_ptr += (unsigned int)n; if (THIS->invoke_pipe_buf_ptr >= sizeof(THIS->invoke_pipe_buf)) { THIS->invoke_pipe_buf_ptr -= sizeof(THIS->invoke_pipe_buf); ((void (*)(void *))(THIS->invoke_pipe_buf[1]))(THIS->invoke_pipe_buf[0]); } } } else { switch(THIS->sockets[fd_idx].base.type) { case ANODE_SOCKET_DATAGRAM: AnodeSystemTransport__poll_do_read_datagram(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]); break; case ANODE_SOCKET_STREAM_LISTEN: AnodeSystemTransport__poll_do_accept_incoming_connection(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]); break; case ANODE_SOCKET_STREAM_CONNECTION: if (THIS->sockets[fd_idx].base.state == ANODE_SOCKET_CONNECTING) AnodeSystemTransport__poll_do_outgoing_connect(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]); else AnodeSystemTransport__poll_do_read_stream(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]); break; } ++event_count; } } if ((THIS->fds[fd_idx].revents & POLLOUT)) { if (THIS->sockets[fd_idx].base.state == ANODE_SOCKET_CONNECTING) AnodeSystemTransport__poll_do_outgoing_connect(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]); else AnodeSystemTransport__poll_do_stream_available_for_write(THIS,THIS->fds[fd_idx].fd,&THIS->sockets[fd_idx]); ++event_count; } } } } return event_count; } static int AnodeSystemTransport_supports_address_type(const AnodeTransport *transport, enum AnodeNetworkAddressType at) { switch(at) { case ANODE_NETWORK_ADDRESS_IPV4: return 1; case ANODE_NETWORK_ADDRESS_IPV6: return 1; default: if (THIS->base) return THIS->base->supports_address_type(THIS->base,at); return 0; } } static AnodeTransport *AnodeSystemTransport_base_instance(const AnodeTransport *transport) { return THIS->base; } static const char *AnodeSystemTransport_class_name(AnodeTransport *transport) { return AnodeSystemTransport_CLASS; } static void AnodeSystemTransport_delete(AnodeTransport *transport) { close(THIS->invoke_pipe[0]); close(THIS->invoke_pipe[1]); AnodeMutex_destroy(&THIS->invoke_pipe_m); if (THIS->fds) free(THIS->fds); if (THIS->sockets) free(THIS->sockets); if (THIS->base) THIS->base->delete(THIS->base); free(transport); } /* ======================================================================== */ AnodeTransport *AnodeSystemTransport_new(AnodeTransport *base) { struct AnodeSystemTransport *t; unsigned int entry_idx; t = malloc(sizeof(struct AnodeSystemTransport)); if (!t) return (AnodeTransport *)0; Anode_zero(t,sizeof(struct AnodeSystemTransport)); t->interface.invoke = &AnodeSystemTransport_invoke; t->interface.dns_resolve = &AnodeSystemTransport_dns_resolve; t->interface.datagram_listen = &AnodeSystemTransport_datagram_listen; t->interface.stream_listen = &AnodeSystemTransport_stream_listen; t->interface.datagram_send = &AnodeSystemTransport_datagram_send; t->interface.stream_connect = &AnodeSystemTransport_stream_connect; t->interface.stream_start_writing = &AnodeSystemTransport_stream_start_writing; t->interface.stream_stop_writing = &AnodeSystemTransport_stream_stop_writing; t->interface.stream_send = &AnodeSystemTransport_stream_send; t->interface.close = &AnodeSystemTransport_close; t->interface.poll = &AnodeSystemTransport_poll; t->interface.supports_address_type = &AnodeSystemTransport_supports_address_type; t->interface.base_instance = &AnodeSystemTransport_base_instance; t->interface.class_name = &AnodeSystemTransport_class_name; t->interface.delete = &AnodeSystemTransport_delete; t->base = base; pipe(t->invoke_pipe); fcntl(t->invoke_pipe[0],F_SETFL,O_NONBLOCK); entry_idx = AnodeSystemTransport__add_entry(t); t->fds[entry_idx].fd = t->invoke_pipe[0]; t->fds[entry_idx].events = POLLIN; t->fds[entry_idx].revents = 0; AnodeMutex_init(&t->invoke_pipe_m); return (AnodeTransport *)t; }