2 * Copyright (c) 2011-2012 Intel Corporation. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
36 #include <sys/types.h>
37 #include <sys/socket.h>
47 #include <netinet/tcp.h>
49 #include <semaphore.h>
54 #include <rdma/rdma_cma.h>
55 #include <rdma/rdma_verbs.h>
56 #include <rdma/rsocket.h>
61 int (*socket)(int domain, int type, int protocol);
62 int (*bind)(int socket, const struct sockaddr *addr, socklen_t addrlen);
63 int (*listen)(int socket, int backlog);
64 int (*accept)(int socket, struct sockaddr *addr, socklen_t *addrlen);
65 int (*connect)(int socket, const struct sockaddr *addr, socklen_t addrlen);
66 ssize_t (*recv)(int socket, void *buf, size_t len, int flags);
67 ssize_t (*recvfrom)(int socket, void *buf, size_t len, int flags,
68 struct sockaddr *src_addr, socklen_t *addrlen);
69 ssize_t (*recvmsg)(int socket, struct msghdr *msg, int flags);
70 ssize_t (*read)(int socket, void *buf, size_t count);
71 ssize_t (*readv)(int socket, const struct iovec *iov, int iovcnt);
72 ssize_t (*send)(int socket, const void *buf, size_t len, int flags);
73 ssize_t (*sendto)(int socket, const void *buf, size_t len, int flags,
74 const struct sockaddr *dest_addr, socklen_t addrlen);
75 ssize_t (*sendmsg)(int socket, const struct msghdr *msg, int flags);
76 ssize_t (*write)(int socket, const void *buf, size_t count);
77 ssize_t (*writev)(int socket, const struct iovec *iov, int iovcnt);
78 int (*poll)(struct pollfd *fds, nfds_t nfds, int timeout);
79 int (*shutdown)(int socket, int how);
80 int (*close)(int socket);
81 int (*getpeername)(int socket, struct sockaddr *addr, socklen_t *addrlen);
82 int (*getsockname)(int socket, struct sockaddr *addr, socklen_t *addrlen);
83 int (*setsockopt)(int socket, int level, int optname,
84 const void *optval, socklen_t optlen);
85 int (*getsockopt)(int socket, int level, int optname,
86 void *optval, socklen_t *optlen);
87 int (*fcntl)(int socket, int cmd, ... /* arg */);
88 int (*dup2)(int oldfd, int newfd);
89 ssize_t (*sendfile)(int out_fd, int in_fd, off_t *offset, size_t count);
90 int (*fxstat)(int ver, int fd, struct stat *buf);
93 static struct socket_calls real;
94 static struct socket_calls rs;
96 static struct index_map idm;
97 static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
101 static int sq_inline;
102 static int fork_support;
119 enum fd_fork_state state;
125 struct config_entry {
132 static struct config_entry *config;
133 static int config_cnt;
135 static void free_config(void)
138 free(config[--config_cnt].name);
144 * Config file format:
145 * # Starting '#' indicates comment
146 * # wild card values are supported using '*'
147 * # domain - *, INET, INET6, IB
148 * # type - *, STREAM, DGRAM
149 * # protocol - *, TCP, UDP
150 * program_name domain type protocol
152 static void scan_config(void)
154 struct config_entry *new_config;
156 char line[120], prog[64], dom[16], type[16], proto[16];
158 fp = fopen(RS_CONF_DIR "/preload_config", "r");
162 while (fgets(line, sizeof(line), fp)) {
166 if (sscanf(line, "%64s%16s%16s%16s", prog, dom, type, proto) != 4)
169 new_config = realloc(config, (config_cnt + 1) *
170 sizeof(struct config_entry));
175 memset(&config[config_cnt], 0, sizeof(struct config_entry));
177 if (!strcasecmp(dom, "INET") ||
178 !strcasecmp(dom, "AF_INET") ||
179 !strcasecmp(dom, "PF_INET")) {
180 config[config_cnt].domain = AF_INET;
181 } else if (!strcasecmp(dom, "INET6") ||
182 !strcasecmp(dom, "AF_INET6") ||
183 !strcasecmp(dom, "PF_INET6")) {
184 config[config_cnt].domain = AF_INET6;
185 } else if (!strcasecmp(dom, "IB") ||
186 !strcasecmp(dom, "AF_IB") ||
187 !strcasecmp(dom, "PF_IB")) {
188 config[config_cnt].domain = AF_IB;
189 } else if (strcmp(dom, "*")) {
193 if (!strcasecmp(type, "STREAM") ||
194 !strcasecmp(type, "SOCK_STREAM")) {
195 config[config_cnt].type = SOCK_STREAM;
196 } else if (!strcasecmp(type, "DGRAM") ||
197 !strcasecmp(type, "SOCK_DGRAM")) {
198 config[config_cnt].type = SOCK_DGRAM;
199 } else if (strcmp(type, "*")) {
203 if (!strcasecmp(proto, "TCP") ||
204 !strcasecmp(proto, "IPPROTO_TCP")) {
205 config[config_cnt].protocol = IPPROTO_TCP;
206 } else if (!strcasecmp(proto, "UDP") ||
207 !strcasecmp(proto, "IPPROTO_UDP")) {
208 config[config_cnt].protocol = IPPROTO_UDP;
209 } else if (strcmp(proto, "*")) {
213 if (strcmp(prog, "*")) {
214 if (!(config[config_cnt].name = strdup(prog)))
226 static int intercept_socket(int domain, int type, int protocol)
234 if (type == SOCK_STREAM)
235 protocol = IPPROTO_TCP;
236 else if (type == SOCK_DGRAM)
237 protocol = IPPROTO_UDP;
240 for (i = 0; i < config_cnt; i++) {
241 if ((!config[i].name ||
242 !strncasecmp(config[i].name, program_invocation_short_name,
243 strlen(config[i].name))) &&
244 (!config[i].domain || config[i].domain == domain) &&
245 (!config[i].type || config[i].type == type) &&
246 (!config[i].protocol || config[i].protocol == protocol))
253 static int fd_open(void)
258 fdi = calloc(1, sizeof(*fdi));
262 index = open("/dev/null", O_RDONLY);
269 atomic_store(&fdi->refcnt, 1);
270 pthread_mutex_lock(&mut);
271 ret = idm_set(&idm, index, fdi);
272 pthread_mutex_unlock(&mut);
285 static void fd_store(int index, int fd, enum fd_type type, enum fd_fork_state state)
289 fdi = idm_at(&idm, index);
295 static inline enum fd_type fd_get(int index, int *fd)
299 fdi = idm_lookup(&idm, index);
310 static inline int fd_getd(int index)
314 fdi = idm_lookup(&idm, index);
315 return fdi ? fdi->fd : index;
318 static inline enum fd_fork_state fd_gets(int index)
322 fdi = idm_lookup(&idm, index);
323 return fdi ? fdi->state : fd_ready;
326 static inline enum fd_type fd_gett(int index)
330 fdi = idm_lookup(&idm, index);
331 return fdi ? fdi->type : fd_normal;
334 static enum fd_type fd_close(int index, int *fd)
339 fdi = idm_lookup(&idm, index);
341 idm_clear(&idm, index);
353 static void getenv_options(void)
357 var = getenv("RS_SQ_SIZE");
361 var = getenv("RS_RQ_SIZE");
365 var = getenv("RS_INLINE");
367 sq_inline = atoi(var);
369 var = getenv("RDMAV_FORK_SAFE");
371 fork_support = atoi(var);
374 static void init_preload(void)
378 /* Quick check without lock */
382 pthread_mutex_lock(&mut);
386 real.socket = dlsym(RTLD_NEXT, "socket");
387 real.bind = dlsym(RTLD_NEXT, "bind");
388 real.listen = dlsym(RTLD_NEXT, "listen");
389 real.accept = dlsym(RTLD_NEXT, "accept");
390 real.connect = dlsym(RTLD_NEXT, "connect");
391 real.recv = dlsym(RTLD_NEXT, "recv");
392 real.recvfrom = dlsym(RTLD_NEXT, "recvfrom");
393 real.recvmsg = dlsym(RTLD_NEXT, "recvmsg");
394 real.read = dlsym(RTLD_NEXT, "read");
395 real.readv = dlsym(RTLD_NEXT, "readv");
396 real.send = dlsym(RTLD_NEXT, "send");
397 real.sendto = dlsym(RTLD_NEXT, "sendto");
398 real.sendmsg = dlsym(RTLD_NEXT, "sendmsg");
399 real.write = dlsym(RTLD_NEXT, "write");
400 real.writev = dlsym(RTLD_NEXT, "writev");
401 real.poll = dlsym(RTLD_NEXT, "poll");
402 real.shutdown = dlsym(RTLD_NEXT, "shutdown");
403 real.close = dlsym(RTLD_NEXT, "close");
404 real.getpeername = dlsym(RTLD_NEXT, "getpeername");
405 real.getsockname = dlsym(RTLD_NEXT, "getsockname");
406 real.setsockopt = dlsym(RTLD_NEXT, "setsockopt");
407 real.getsockopt = dlsym(RTLD_NEXT, "getsockopt");
408 real.fcntl = dlsym(RTLD_NEXT, "fcntl");
409 real.dup2 = dlsym(RTLD_NEXT, "dup2");
410 real.sendfile = dlsym(RTLD_NEXT, "sendfile");
411 real.fxstat = dlsym(RTLD_NEXT, "__fxstat");
413 rs.socket = dlsym(RTLD_DEFAULT, "rsocket");
414 rs.bind = dlsym(RTLD_DEFAULT, "rbind");
415 rs.listen = dlsym(RTLD_DEFAULT, "rlisten");
416 rs.accept = dlsym(RTLD_DEFAULT, "raccept");
417 rs.connect = dlsym(RTLD_DEFAULT, "rconnect");
418 rs.recv = dlsym(RTLD_DEFAULT, "rrecv");
419 rs.recvfrom = dlsym(RTLD_DEFAULT, "rrecvfrom");
420 rs.recvmsg = dlsym(RTLD_DEFAULT, "rrecvmsg");
421 rs.read = dlsym(RTLD_DEFAULT, "rread");
422 rs.readv = dlsym(RTLD_DEFAULT, "rreadv");
423 rs.send = dlsym(RTLD_DEFAULT, "rsend");
424 rs.sendto = dlsym(RTLD_DEFAULT, "rsendto");
425 rs.sendmsg = dlsym(RTLD_DEFAULT, "rsendmsg");
426 rs.write = dlsym(RTLD_DEFAULT, "rwrite");
427 rs.writev = dlsym(RTLD_DEFAULT, "rwritev");
428 rs.poll = dlsym(RTLD_DEFAULT, "rpoll");
429 rs.shutdown = dlsym(RTLD_DEFAULT, "rshutdown");
430 rs.close = dlsym(RTLD_DEFAULT, "rclose");
431 rs.getpeername = dlsym(RTLD_DEFAULT, "rgetpeername");
432 rs.getsockname = dlsym(RTLD_DEFAULT, "rgetsockname");
433 rs.setsockopt = dlsym(RTLD_DEFAULT, "rsetsockopt");
434 rs.getsockopt = dlsym(RTLD_DEFAULT, "rgetsockopt");
435 rs.fcntl = dlsym(RTLD_DEFAULT, "rfcntl");
441 pthread_mutex_unlock(&mut);
445 * We currently only handle copying a few common values.
447 static int copysockopts(int dfd, int sfd, struct socket_calls *dapi,
448 struct socket_calls *sapi)
453 ret = sapi->fcntl(sfd, F_GETFL);
455 ret = dapi->fcntl(dfd, F_SETFL, ret);
460 ret = sapi->getsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, ¶m, &len);
462 ret = dapi->setsockopt(dfd, SOL_SOCKET, SO_REUSEADDR, ¶m, len);
467 ret = sapi->getsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, ¶m, &len);
469 ret = dapi->setsockopt(dfd, IPPROTO_TCP, TCP_NODELAY, ¶m, len);
477 * Convert between an rsocket and a normal socket.
479 static int transpose_socket(int socket, enum fd_type new_type)
482 int sfd, dfd, param, ret;
483 struct socket_calls *sapi, *dapi;
485 sfd = fd_getd(socket);
486 if (new_type == fd_rsocket) {
494 ret = sapi->getsockname(sfd, NULL, &len);
498 param = (len == sizeof(struct sockaddr_in6)) ? PF_INET6 : PF_INET;
499 dfd = dapi->socket(param, SOCK_STREAM, 0);
503 ret = copysockopts(dfd, sfd, dapi, sapi);
507 fd_store(socket, dfd, new_type, fd_ready);
516 * Use defaults on failure.
518 static void set_rsocket_options(int rsocket)
521 rsetsockopt(rsocket, SOL_RDMA, RDMA_SQSIZE, &sq_size, sizeof sq_size);
524 rsetsockopt(rsocket, SOL_RDMA, RDMA_RQSIZE, &rq_size, sizeof rq_size);
527 rsetsockopt(rsocket, SOL_RDMA, RDMA_INLINE, &sq_inline, sizeof sq_inline);
530 int socket(int domain, int type, int protocol)
532 static __thread int recursive;
537 if (recursive || !intercept_socket(domain, type, protocol))
544 if (fork_support && (domain == PF_INET || domain == PF_INET6) &&
545 (type == SOCK_STREAM) && (!protocol || protocol == IPPROTO_TCP)) {
546 ret = real.socket(domain, type, protocol);
549 fd_store(index, ret, fd_normal, fd_fork);
554 ret = rsocket(domain, type, protocol);
557 fd_store(index, ret, fd_rsocket, fd_ready);
558 set_rsocket_options(ret);
561 fd_close(index, &ret);
563 return real.socket(domain, type, protocol);
566 int bind(int socket, const struct sockaddr *addr, socklen_t addrlen)
569 return (fd_get(socket, &fd) == fd_rsocket) ?
570 rbind(fd, addr, addrlen) : real.bind(fd, addr, addrlen);
573 int listen(int socket, int backlog)
576 if (fd_get(socket, &fd) == fd_rsocket) {
577 ret = rlisten(fd, backlog);
579 ret = real.listen(fd, backlog);
580 if (!ret && fd_gets(socket) == fd_fork)
581 fd_store(socket, fd, fd_normal, fd_fork_listen);
586 int accept(int socket, struct sockaddr *addr, socklen_t *addrlen)
590 if (fd_get(socket, &fd) == fd_rsocket) {
595 ret = raccept(fd, addr, addrlen);
597 fd_close(index, &fd);
601 fd_store(index, ret, fd_rsocket, fd_ready);
603 } else if (fd_gets(socket) == fd_fork_listen) {
608 ret = real.accept(fd, addr, addrlen);
610 fd_close(index, &fd);
614 fd_store(index, ret, fd_normal, fd_fork_passive);
617 return real.accept(fd, addr, addrlen);
622 * We can't fork RDMA connections and pass them from the parent to the child
623 * process. Instead, we need to establish the RDMA connection after calling
624 * fork. To do this, we delay establishing the RDMA connection until we try
625 * to send/receive on the server side.
627 static void fork_active(int socket)
629 struct sockaddr_storage addr;
635 sfd = fd_getd(socket);
637 flags = real.fcntl(sfd, F_GETFL);
638 real.fcntl(sfd, F_SETFL, 0);
639 ret = real.recv(sfd, &msg, sizeof msg, MSG_PEEK);
640 real.fcntl(sfd, F_SETFL, flags);
641 if ((ret != sizeof msg) || msg)
645 ret = real.getpeername(sfd, (struct sockaddr *) &addr, &len);
649 dfd = rsocket(addr.ss_family, SOCK_STREAM, 0);
653 ret = rconnect(dfd, (struct sockaddr *) &addr, len);
657 set_rsocket_options(dfd);
658 copysockopts(dfd, sfd, &rs, &real);
659 real.shutdown(sfd, SHUT_RDWR);
661 fd_store(socket, dfd, fd_rsocket, fd_ready);
667 fd_store(socket, sfd, fd_normal, fd_ready);
671 * The server will start listening for the new connection, then send a
672 * message to the active side when the listen is ready. This does leave
673 * fork unsupported in the following case: the server is nonblocking and
674 * calls select/poll waiting to receive data from the client.
676 static void fork_passive(int socket)
678 struct sockaddr_in6 sin6;
680 int lfd, sfd, dfd, ret, param;
684 sfd = fd_getd(socket);
687 ret = real.getsockname(sfd, (struct sockaddr *) &sin6, &len);
690 sin6.sin6_flowinfo = 0;
691 sin6.sin6_scope_id = 0;
692 memset(&sin6.sin6_addr, 0, sizeof sin6.sin6_addr);
694 sem = sem_open("/rsocket_fork", O_CREAT | O_RDWR,
695 S_IRWXU | S_IRWXG, 1);
696 if (sem == SEM_FAILED) {
701 lfd = rsocket(sin6.sin6_family, SOCK_STREAM, 0);
708 rsetsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, ¶m, sizeof param);
711 ret = rbind(lfd, (struct sockaddr *) &sin6, sizeof sin6);
715 ret = rlisten(lfd, 1);
720 len = real.write(sfd, &msg, sizeof msg);
721 if (len != sizeof msg)
724 dfd = raccept(lfd, NULL, NULL);
730 set_rsocket_options(dfd);
731 copysockopts(dfd, sfd, &rs, &real);
732 real.shutdown(sfd, SHUT_RDWR);
734 fd_store(socket, dfd, fd_rsocket, fd_ready);
743 fd_store(socket, sfd, fd_normal, fd_ready);
746 static inline enum fd_type fd_fork_get(int index, int *fd)
750 fdi = idm_lookup(&idm, index);
752 if (fdi->state == fd_fork_passive)
754 else if (fdi->state == fd_fork_active)
765 int connect(int socket, const struct sockaddr *addr, socklen_t addrlen)
769 if (fd_get(socket, &fd) == fd_rsocket) {
770 ret = rconnect(fd, addr, addrlen);
771 if (!ret || errno == EINPROGRESS)
774 ret = transpose_socket(socket, fd_normal);
780 } else if (fd_gets(socket) == fd_fork) {
781 fd_store(socket, fd, fd_normal, fd_fork_active);
784 return real.connect(fd, addr, addrlen);
787 ssize_t recv(int socket, void *buf, size_t len, int flags)
790 return (fd_fork_get(socket, &fd) == fd_rsocket) ?
791 rrecv(fd, buf, len, flags) : real.recv(fd, buf, len, flags);
794 ssize_t recvfrom(int socket, void *buf, size_t len, int flags,
795 struct sockaddr *src_addr, socklen_t *addrlen)
798 return (fd_fork_get(socket, &fd) == fd_rsocket) ?
799 rrecvfrom(fd, buf, len, flags, src_addr, addrlen) :
800 real.recvfrom(fd, buf, len, flags, src_addr, addrlen);
803 ssize_t recvmsg(int socket, struct msghdr *msg, int flags)
806 return (fd_fork_get(socket, &fd) == fd_rsocket) ?
807 rrecvmsg(fd, msg, flags) : real.recvmsg(fd, msg, flags);
810 ssize_t read(int socket, void *buf, size_t count)
814 return (fd_fork_get(socket, &fd) == fd_rsocket) ?
815 rread(fd, buf, count) : real.read(fd, buf, count);
818 ssize_t readv(int socket, const struct iovec *iov, int iovcnt)
822 return (fd_fork_get(socket, &fd) == fd_rsocket) ?
823 rreadv(fd, iov, iovcnt) : real.readv(fd, iov, iovcnt);
826 ssize_t send(int socket, const void *buf, size_t len, int flags)
829 return (fd_fork_get(socket, &fd) == fd_rsocket) ?
830 rsend(fd, buf, len, flags) : real.send(fd, buf, len, flags);
833 ssize_t sendto(int socket, const void *buf, size_t len, int flags,
834 const struct sockaddr *dest_addr, socklen_t addrlen)
837 return (fd_fork_get(socket, &fd) == fd_rsocket) ?
838 rsendto(fd, buf, len, flags, dest_addr, addrlen) :
839 real.sendto(fd, buf, len, flags, dest_addr, addrlen);
842 ssize_t sendmsg(int socket, const struct msghdr *msg, int flags)
845 return (fd_fork_get(socket, &fd) == fd_rsocket) ?
846 rsendmsg(fd, msg, flags) : real.sendmsg(fd, msg, flags);
849 ssize_t write(int socket, const void *buf, size_t count)
853 return (fd_fork_get(socket, &fd) == fd_rsocket) ?
854 rwrite(fd, buf, count) : real.write(fd, buf, count);
857 ssize_t writev(int socket, const struct iovec *iov, int iovcnt)
861 return (fd_fork_get(socket, &fd) == fd_rsocket) ?
862 rwritev(fd, iov, iovcnt) : real.writev(fd, iov, iovcnt);
865 static struct pollfd *fds_alloc(nfds_t nfds)
867 static __thread struct pollfd *rfds;
868 static __thread nfds_t rnfds;
874 rfds = malloc(sizeof(*rfds) * nfds);
875 rnfds = rfds ? nfds : 0;
881 int poll(struct pollfd *fds, nfds_t nfds, int timeout)
887 for (i = 0; i < nfds; i++) {
888 if (fd_gett(fds[i].fd) == fd_rsocket)
892 return real.poll(fds, nfds, timeout);
895 rfds = fds_alloc(nfds);
899 for (i = 0; i < nfds; i++) {
900 rfds[i].fd = fd_getd(fds[i].fd);
901 rfds[i].events = fds[i].events;
905 ret = rpoll(rfds, nfds, timeout);
907 for (i = 0; i < nfds; i++)
908 fds[i].revents = rfds[i].revents;
913 static void select_to_rpoll(struct pollfd *fds, int *nfds,
914 fd_set *readfds, fd_set *writefds, fd_set *exceptfds)
916 int fd, events, i = 0;
918 for (fd = 0; fd < *nfds; fd++) {
919 events = (readfds && FD_ISSET(fd, readfds)) ? POLLIN : 0;
920 if (writefds && FD_ISSET(fd, writefds))
923 if (events || (exceptfds && FD_ISSET(fd, exceptfds))) {
924 fds[i].fd = fd_getd(fd);
925 fds[i++].events = events;
932 static int rpoll_to_select(struct pollfd *fds, int nfds,
933 fd_set *readfds, fd_set *writefds, fd_set *exceptfds)
935 int fd, rfd, i, cnt = 0;
937 for (i = 0, fd = 0; i < nfds; fd++) {
939 if (rfd != fds[i].fd)
942 if (readfds && (fds[i].revents & POLLIN)) {
947 if (writefds && (fds[i].revents & POLLOUT)) {
948 FD_SET(fd, writefds);
952 if (exceptfds && (fds[i].revents & ~(POLLIN | POLLOUT))) {
953 FD_SET(fd, exceptfds);
962 static int rs_convert_timeout(struct timeval *timeout)
964 return !timeout ? -1 : timeout->tv_sec * 1000 + timeout->tv_usec / 1000;
967 int select(int nfds, fd_set *readfds, fd_set *writefds,
968 fd_set *exceptfds, struct timeval *timeout)
973 fds = fds_alloc(nfds);
977 select_to_rpoll(fds, &nfds, readfds, writefds, exceptfds);
978 ret = rpoll(fds, nfds, rs_convert_timeout(timeout));
988 ret = rpoll_to_select(fds, nfds, readfds, writefds, exceptfds);
993 int shutdown(int socket, int how)
996 return (fd_get(socket, &fd) == fd_rsocket) ?
997 rshutdown(fd, how) : real.shutdown(fd, how);
1000 int close(int socket)
1002 struct fd_info *fdi;
1006 fdi = idm_lookup(&idm, socket);
1008 return real.close(socket);
1010 if (fdi->dupfd != -1) {
1011 ret = close(fdi->dupfd);
1016 if (atomic_fetch_sub(&fdi->refcnt, 1) != 1)
1019 idm_clear(&idm, socket);
1021 ret = (fdi->type == fd_rsocket) ? rclose(fdi->fd) : real.close(fdi->fd);
1026 int getpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
1029 return (fd_get(socket, &fd) == fd_rsocket) ?
1030 rgetpeername(fd, addr, addrlen) :
1031 real.getpeername(fd, addr, addrlen);
1034 int getsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
1038 return (fd_get(socket, &fd) == fd_rsocket) ?
1039 rgetsockname(fd, addr, addrlen) :
1040 real.getsockname(fd, addr, addrlen);
1043 int setsockopt(int socket, int level, int optname,
1044 const void *optval, socklen_t optlen)
1047 return (fd_get(socket, &fd) == fd_rsocket) ?
1048 rsetsockopt(fd, level, optname, optval, optlen) :
1049 real.setsockopt(fd, level, optname, optval, optlen);
1052 int getsockopt(int socket, int level, int optname,
1053 void *optval, socklen_t *optlen)
1056 return (fd_get(socket, &fd) == fd_rsocket) ?
1057 rgetsockopt(fd, level, optname, optval, optlen) :
1058 real.getsockopt(fd, level, optname, optval, optlen);
1061 int fcntl(int socket, int cmd, ... /* arg */)
1069 va_start(args, cmd);
1076 ret = (fd_get(socket, &fd) == fd_rsocket) ?
1077 rfcntl(fd, cmd) : real.fcntl(fd, cmd);
1080 /*case F_DUPFD_CLOEXEC:*/
1087 lparam = va_arg(args, long);
1088 ret = (fd_get(socket, &fd) == fd_rsocket) ?
1089 rfcntl(fd, cmd, lparam) : real.fcntl(fd, cmd, lparam);
1092 pparam = va_arg(args, void *);
1093 ret = (fd_get(socket, &fd) == fd_rsocket) ?
1094 rfcntl(fd, cmd, pparam) : real.fcntl(fd, cmd, pparam);
1102 * dup2 is not thread safe
1104 int dup2(int oldfd, int newfd)
1106 struct fd_info *oldfdi, *newfdi;
1110 oldfdi = idm_lookup(&idm, oldfd);
1112 if (oldfdi->state == fd_fork_passive)
1113 fork_passive(oldfd);
1114 else if (oldfdi->state == fd_fork_active)
1118 newfdi = idm_lookup(&idm, newfd);
1120 /* newfd cannot have been dup'ed directly */
1121 if (atomic_load(&newfdi->refcnt) > 1)
1126 ret = real.dup2(oldfd, newfd);
1127 if (!oldfdi || ret != newfd)
1130 newfdi = calloc(1, sizeof(*newfdi));
1136 pthread_mutex_lock(&mut);
1137 idm_set(&idm, newfd, newfdi);
1138 pthread_mutex_unlock(&mut);
1140 newfdi->fd = oldfdi->fd;
1141 newfdi->type = oldfdi->type;
1142 if (oldfdi->dupfd != -1) {
1143 newfdi->dupfd = oldfdi->dupfd;
1144 oldfdi = idm_lookup(&idm, oldfdi->dupfd);
1146 newfdi->dupfd = oldfd;
1148 atomic_store(&newfdi->refcnt, 1);
1149 atomic_fetch_add(&oldfdi->refcnt, 1);
1153 ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count)
1159 if (fd_get(out_fd, &fd) != fd_rsocket)
1160 return real.sendfile(fd, in_fd, offset, count);
1162 file_addr = mmap(NULL, count, PROT_READ, 0, in_fd, offset ? *offset : 0);
1163 if (file_addr == (void *) -1)
1166 ret = rwrite(fd, file_addr, count);
1167 if ((ret > 0) && offset)
1168 lseek(in_fd, ret, SEEK_CUR);
1169 munmap(file_addr, count);
1173 int __fxstat(int ver, int socket, struct stat *buf)
1178 if (fd_get(socket, &fd) == fd_rsocket) {
1179 ret = real.fxstat(ver, socket, buf);
1181 buf->st_mode = (buf->st_mode & ~S_IFMT) | __S_IFSOCK;
1183 ret = real.fxstat(ver, fd, buf);