2 * Copyright (c) 2008-2009 Robert N. M. Watson
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29 #include <sys/types.h>
30 #include <sys/event.h>
31 #include <sys/resource.h>
32 #include <sys/sched.h>
33 #include <sys/socket.h>
34 #include <sys/sysctl.h>
38 #include <netinet/in.h>
52 #define min(x, y) (x < y ? x : y)
54 #define timespecsub(vvp, uvp) \
56 (vvp)->tv_sec -= (uvp)->tv_sec; \
57 (vvp)->tv_nsec -= (uvp)->tv_nsec; \
58 if ((vvp)->tv_nsec < 0) { \
60 (vvp)->tv_nsec += 1000000000; \
66 * Gist of each client worker: build up to mflag connections at a time, and
67 * pump data in to them somewhat fairly until tflag connections have been
70 #define CONNECTION_MAGIC 0x87a3f56e
72 uint32_t conn_magic; /* Just magic. */
74 struct tcpp_header conn_header; /* Header buffer. */
75 u_int conn_header_sent; /* Header bytes sent. */
76 u_int64_t conn_data_sent; /* Data bytes sent.*/
79 static u_char buffer[256 * 1024]; /* Buffer to send. */
80 static pid_t *pid_list;
82 static int started; /* Number started so far. */
83 static int finished; /* Number finished so far. */
84 static int counter; /* IP number offset. */
86 static struct connection *
87 tcpp_client_newconn(void)
89 struct sockaddr_in sin;
90 struct connection *conn;
95 * Spread load over available IPs, roating through them as we go. No
96 * attempt to localize IPs to particular workers.
99 sin.sin_addr.s_addr = htonl(ntohl(localipbase.sin_addr.s_addr) +
100 (counter++ % Mflag));
102 fd = socket(PF_INET, SOCK_STREAM, 0);
106 if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0)
110 if (setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof(i)) < 0)
111 err(-1, "setsockopt");
114 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i)) < 0)
115 err(-1, "setsockopt");
119 if (bind(fd, (struct sockaddr *)&sin, sizeof(sin)) < 0)
123 if (connect(fd, (struct sockaddr *)&remoteip, sizeof(remoteip)) < 0 &&
124 errno != EINPROGRESS)
127 conn = malloc(sizeof(*conn));
130 bzero(conn, sizeof(*conn));
131 conn->conn_magic = CONNECTION_MAGIC;
133 conn->conn_header.th_magic = TCPP_MAGIC;
134 conn->conn_header.th_len = bflag;
135 tcpp_header_encode(&conn->conn_header);
137 EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD, 0, 0, conn);
138 if (kevent(kq, &kev, 1, NULL, 0, NULL) < 0)
139 err(-1, "newconn kevent");
146 tcpp_client_closeconn(struct connection *conn)
149 close(conn->conn_fd);
150 bzero(conn, sizeof(*conn));
156 tcpp_client_handleconn(struct kevent *kev)
158 struct connection *conn;
162 if (conn->conn_magic != CONNECTION_MAGIC)
163 errx(-1, "tcpp_client_handleconn: magic");
165 if (conn->conn_header_sent < sizeof(conn->conn_header)) {
166 len = write(conn->conn_fd, ((u_char *)&conn->conn_header) +
167 conn->conn_header_sent, sizeof(conn->conn_header) -
168 conn->conn_header_sent);
170 tcpp_client_closeconn(conn);
171 err(-1, "tcpp_client_handleconn: header write");
174 tcpp_client_closeconn(conn);
175 errx(-1, "tcpp_client_handleconn: header write "
178 conn->conn_header_sent += len;
180 len = write(conn->conn_fd, buffer, min(sizeof(buffer),
181 bflag - conn->conn_data_sent));
183 tcpp_client_closeconn(conn);
184 err(-1, "tcpp_client_handleconn: data write");
187 tcpp_client_closeconn(conn);
188 errx(-1, "tcpp_client_handleconn: data write: "
191 conn->conn_data_sent += len;
192 if (conn->conn_data_sent >= bflag) {
196 tcpp_client_closeconn(conn);
202 tcpp_client_worker(int workernum)
204 struct kevent *kev_array;
205 int i, numevents, kev_bytes;
206 #if defined(CPU_SETSIZE) && 0
212 if (sysctlbyname(SYSCTLNAME_CPUS, &ncpus, &len, NULL, 0) < 0)
213 err(-1, "sysctlbyname: %s", SYSCTLNAME_CPUS);
214 if (len != sizeof(ncpus))
215 errx(-1, "sysctlbyname: %s: len %jd", SYSCTLNAME_CPUS,
219 CPU_SET(workernum % ncpus, &mask);
220 if (sched_setaffinity(0, CPU_SETSIZE, &mask) < 0)
221 err(-1, "sched_setaffinity");
223 setproctitle("tcpp_client %d", workernum);
226 * Add the worker number to the remote port.
228 remoteip.sin_port = htons(rflag + workernum);
230 kev_bytes = sizeof(*kev_array) * mflag;
231 kev_array = malloc(kev_bytes);
232 if (kev_array == NULL)
234 bzero(kev_array, kev_bytes);
240 while (finished < tflag) {
241 while ((started - finished < mflag) && (started < tflag))
242 (void)tcpp_client_newconn();
243 numevents = kevent(kq, NULL, 0, kev_array, mflag, NULL);
246 if (numevents > mflag)
247 errx(-1, "kevent: %d", numevents);
248 for (i = 0; i < numevents; i++)
249 tcpp_client_handleconn(&kev_array[i]);
251 /* printf("Worker %d done - %d finished\n", workernum, finished); */
257 struct timespec ts_start, ts_finish;
258 long cp_time_start[CPUSTATES], cp_time_finish[CPUSTATES];
262 int i, failed, status;
264 pid_list = malloc(sizeof(*pid_list) * pflag);
265 if (pid_list == NULL)
266 err(-1, "malloc pid_list");
267 bzero(pid_list, sizeof(*pid_list) * pflag);
272 size = sizeof(cp_time_start);
273 if (sysctlbyname(SYSCTLNAME_CPTIME, &cp_time_start, &size, NULL, 0)
275 err(-1, "sysctlbyname: %s", SYSCTLNAME_CPTIME);
276 if (clock_gettime(CLOCK_REALTIME, &ts_start) < 0)
277 err(-1, "clock_gettime");
278 for (i = 0; i < pflag; i++) {
282 for (i = 0; i < pflag; i++) {
283 if (pid_list[i] != 0)
284 (void)kill(pid_list[i], SIGKILL);
289 tcpp_client_worker(i);
299 for (i = 0; i < pflag; i++) {
300 if (pid_list[i] != 0) {
301 while (waitpid(pid_list[i], &status, 0) != pid_list[i]);
302 if (WEXITSTATUS(status) != 0)
306 if (clock_gettime(CLOCK_REALTIME, &ts_finish) < 0)
307 err(-1, "clock_gettime");
308 size = sizeof(cp_time_finish);
309 if (sysctlbyname(SYSCTLNAME_CPTIME, &cp_time_finish, &size, NULL, 0)
311 err(-1, "sysctlbyname: %s", SYSCTLNAME_CPTIME);
312 timespecsub(&ts_finish, &ts_start);
315 errx(-1, "Too many errors");
317 printf("%jd bytes transferred in %jd.%09jd seconds\n",
318 (bflag * tflag * pflag), (intmax_t)ts_finish.tv_sec,
319 (intmax_t)(ts_finish.tv_nsec));
322 printf("%d procs ", pflag);
324 printf("%f cps%s", (double)(pflag * tflag)/
325 (ts_finish.tv_sec + ts_finish.tv_nsec * 1e-9),
328 printf("%f Gbps%s", (double)(bflag * tflag * pflag * 8) /
329 (ts_finish.tv_sec + ts_finish.tv_nsec * 1e-9) * 1e-9,
334 for (i = 0; i < CPUSTATES; i++) {
335 cp_time_finish[i] -= cp_time_start[i];
336 ticks += cp_time_finish[i];
338 printf("user%% %lu nice%% %lu sys%% %lu intr%% %lu "
340 (100 * cp_time_finish[CP_USER]) / ticks,
341 (100 * cp_time_finish[CP_NICE]) / ticks,
342 (100 * cp_time_finish[CP_SYS]) / ticks,
343 (100 * cp_time_finish[CP_INTR]) / ticks,
344 (100 * cp_time_finish[CP_IDLE]) / ticks);