]> CyberLeo.Net >> Repos - FreeBSD/releng/10.0.git/blob - contrib/ofed/librdmacm/examples/rping.c
- Copy stable/10 (r259064) to releng/10.0 as part of the
[FreeBSD/releng/10.0.git] / contrib / ofed / librdmacm / examples / rping.c
1 /*
2  * Copyright (c) 2005 Ammasso, Inc. All rights reserved.
3  * Copyright (c) 2006 Open Grid Computing, Inc. All rights reserved.
4  *
5  * This software is available to you under a choice of one of two
6  * licenses.  You may choose to be licensed under the terms of the GNU
7  * General Public License (GPL) Version 2, available from the file
8  * COPYING in the main directory of this source tree, or the
9  * OpenIB.org BSD license below:
10  *
11  *     Redistribution and use in source and binary forms, with or
12  *     without modification, are permitted provided that the following
13  *     conditions are met:
14  *
15  *      - Redistributions of source code must retain the above
16  *        copyright notice, this list of conditions and the following
17  *        disclaimer.
18  *
19  *      - Redistributions in binary form must reproduce the above
20  *        copyright notice, this list of conditions and the following
21  *        disclaimer in the documentation and/or other materials
22  *        provided with the distribution.
23  *
24  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31  * SOFTWARE.
32  */
33
34 #include <getopt.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <stdio.h>
38 #include <errno.h>
39 #include <sys/types.h>
40 #include <netinet/in.h>
41 #include <sys/socket.h>
42 #include <netdb.h>
43 #include <byteswap.h>
44 #include <semaphore.h>
45 #include <arpa/inet.h>
46 #include <pthread.h>
47 #include <inttypes.h>
48
49 #include <rdma/rdma_cma.h>
50 #include <infiniband/arch.h>
51
52 static int debug = 0;
53 #define DEBUG_LOG if (debug) printf
54
55 /*
56  * rping "ping/pong" loop:
57  *      client sends source rkey/addr/len
58  *      server receives source rkey/add/len
59  *      server rdma reads "ping" data from source
60  *      server sends "go ahead" on rdma read completion
61  *      client sends sink rkey/addr/len
62  *      server receives sink rkey/addr/len
63  *      server rdma writes "pong" data to sink
64  *      server sends "go ahead" on rdma write completion
65  *      <repeat loop>
66  */
67
68 /*
69  * These states are used to signal events between the completion handler
70  * and the main client or server thread.
71  *
72  * Once CONNECTED, they cycle through RDMA_READ_ADV, RDMA_WRITE_ADV, 
73  * and RDMA_WRITE_COMPLETE for each ping.
74  */
75 enum test_state {
76         IDLE = 1,
77         CONNECT_REQUEST,
78         ADDR_RESOLVED,
79         ROUTE_RESOLVED,
80         CONNECTED,
81         RDMA_READ_ADV,
82         RDMA_READ_COMPLETE,
83         RDMA_WRITE_ADV,
84         RDMA_WRITE_COMPLETE,
85         ERROR
86 };
87
88 struct rping_rdma_info {
89         uint64_t buf;
90         uint32_t rkey;
91         uint32_t size;
92 };
93
94 /*
95  * Default max buffer size for IO...
96  */
97 #define RPING_BUFSIZE 64*1024
98 #define RPING_SQ_DEPTH 16
99
100 /* Default string for print data and
101  * minimum buffer size
102  */
103 #define _stringify( _x ) # _x
104 #define stringify( _x ) _stringify( _x )
105
106 #define RPING_MSG_FMT           "rdma-ping-%d: "
107 #define RPING_MIN_BUFSIZE       sizeof(stringify(INT_MAX)) + sizeof(RPING_MSG_FMT)
108
109 /*
110  * Control block struct.
111  */
112 struct rping_cb {
113         int server;                     /* 0 iff client */
114         pthread_t cqthread;
115         pthread_t persistent_server_thread;
116         struct ibv_comp_channel *channel;
117         struct ibv_cq *cq;
118         struct ibv_pd *pd;
119         struct ibv_qp *qp;
120
121         struct ibv_recv_wr rq_wr;       /* recv work request record */
122         struct ibv_sge recv_sgl;        /* recv single SGE */
123         struct rping_rdma_info recv_buf;/* malloc'd buffer */
124         struct ibv_mr *recv_mr;         /* MR associated with this buffer */
125
126         struct ibv_send_wr sq_wr;       /* send work request record */
127         struct ibv_sge send_sgl;
128         struct rping_rdma_info send_buf;/* single send buf */
129         struct ibv_mr *send_mr;
130
131         struct ibv_send_wr rdma_sq_wr;  /* rdma work request record */
132         struct ibv_sge rdma_sgl;        /* rdma single SGE */
133         char *rdma_buf;                 /* used as rdma sink */
134         struct ibv_mr *rdma_mr;
135
136         uint32_t remote_rkey;           /* remote guys RKEY */
137         uint64_t remote_addr;           /* remote guys TO */
138         uint32_t remote_len;            /* remote guys LEN */
139
140         char *start_buf;                /* rdma read src */
141         struct ibv_mr *start_mr;
142
143         enum test_state state;          /* used for cond/signalling */
144         sem_t sem;
145
146         struct sockaddr_storage sin;
147         uint16_t port;                  /* dst port in NBO */
148         int verbose;                    /* verbose logging */
149         int count;                      /* ping count */
150         int size;                       /* ping data size */
151         int validate;                   /* validate ping data */
152
153         /* CM stuff */
154         pthread_t cmthread;
155         struct rdma_event_channel *cm_channel;
156         struct rdma_cm_id *cm_id;       /* connection on client side,*/
157                                         /* listener on service side. */
158         struct rdma_cm_id *child_cm_id; /* connection on server side */
159 };
160
161 static int rping_cma_event_handler(struct rdma_cm_id *cma_id,
162                                     struct rdma_cm_event *event)
163 {
164         int ret = 0;
165         struct rping_cb *cb = cma_id->context;
166
167         DEBUG_LOG("cma_event type %s cma_id %p (%s)\n",
168                   rdma_event_str(event->event), cma_id,
169                   (cma_id == cb->cm_id) ? "parent" : "child");
170
171         switch (event->event) {
172         case RDMA_CM_EVENT_ADDR_RESOLVED:
173                 cb->state = ADDR_RESOLVED;
174                 ret = rdma_resolve_route(cma_id, 2000);
175                 if (ret) {
176                         cb->state = ERROR;
177                         perror("rdma_resolve_route");
178                         sem_post(&cb->sem);
179                 }
180                 break;
181
182         case RDMA_CM_EVENT_ROUTE_RESOLVED:
183                 cb->state = ROUTE_RESOLVED;
184                 sem_post(&cb->sem);
185                 break;
186
187         case RDMA_CM_EVENT_CONNECT_REQUEST:
188                 cb->state = CONNECT_REQUEST;
189                 cb->child_cm_id = cma_id;
190                 DEBUG_LOG("child cma %p\n", cb->child_cm_id);
191                 sem_post(&cb->sem);
192                 break;
193
194         case RDMA_CM_EVENT_ESTABLISHED:
195                 DEBUG_LOG("ESTABLISHED\n");
196
197                 /*
198                  * Server will wake up when first RECV completes.
199                  */
200                 if (!cb->server) {
201                         cb->state = CONNECTED;
202                 }
203                 sem_post(&cb->sem);
204                 break;
205
206         case RDMA_CM_EVENT_ADDR_ERROR:
207         case RDMA_CM_EVENT_ROUTE_ERROR:
208         case RDMA_CM_EVENT_CONNECT_ERROR:
209         case RDMA_CM_EVENT_UNREACHABLE:
210         case RDMA_CM_EVENT_REJECTED:
211                 fprintf(stderr, "cma event %s, error %d\n",
212                         rdma_event_str(event->event), event->status);
213                 sem_post(&cb->sem);
214                 ret = -1;
215                 break;
216
217         case RDMA_CM_EVENT_DISCONNECTED:
218                 fprintf(stderr, "%s DISCONNECT EVENT...\n",
219                         cb->server ? "server" : "client");
220                 sem_post(&cb->sem);
221                 break;
222
223         case RDMA_CM_EVENT_DEVICE_REMOVAL:
224                 fprintf(stderr, "cma detected device removal!!!!\n");
225                 ret = -1;
226                 break;
227
228         default:
229                 fprintf(stderr, "unhandled event: %s, ignoring\n",
230                         rdma_event_str(event->event));
231                 break;
232         }
233
234         return ret;
235 }
236
237 static int server_recv(struct rping_cb *cb, struct ibv_wc *wc)
238 {
239         if (wc->byte_len != sizeof(cb->recv_buf)) {
240                 fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len);
241                 return -1;
242         }
243
244         cb->remote_rkey = ntohl(cb->recv_buf.rkey);
245         cb->remote_addr = ntohll(cb->recv_buf.buf);
246         cb->remote_len  = ntohl(cb->recv_buf.size);
247         DEBUG_LOG("Received rkey %x addr %" PRIx64 " len %d from peer\n",
248                   cb->remote_rkey, cb->remote_addr, cb->remote_len);
249
250         if (cb->state <= CONNECTED || cb->state == RDMA_WRITE_COMPLETE)
251                 cb->state = RDMA_READ_ADV;
252         else
253                 cb->state = RDMA_WRITE_ADV;
254
255         return 0;
256 }
257
258 static int client_recv(struct rping_cb *cb, struct ibv_wc *wc)
259 {
260         if (wc->byte_len != sizeof(cb->recv_buf)) {
261                 fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len);
262                 return -1;
263         }
264
265         if (cb->state == RDMA_READ_ADV)
266                 cb->state = RDMA_WRITE_ADV;
267         else
268                 cb->state = RDMA_WRITE_COMPLETE;
269
270         return 0;
271 }
272
273 static int rping_cq_event_handler(struct rping_cb *cb)
274 {
275         struct ibv_wc wc;
276         struct ibv_recv_wr *bad_wr;
277         int ret;
278
279         while ((ret = ibv_poll_cq(cb->cq, 1, &wc)) == 1) {
280                 ret = 0;
281
282                 if (wc.status) {
283                         fprintf(stderr, "cq completion failed status %d\n",
284                                 wc.status);
285                         if (wc.status != IBV_WC_WR_FLUSH_ERR)
286                                 ret = -1;
287                         goto error;
288                 }
289
290                 switch (wc.opcode) {
291                 case IBV_WC_SEND:
292                         DEBUG_LOG("send completion\n");
293                         break;
294
295                 case IBV_WC_RDMA_WRITE:
296                         DEBUG_LOG("rdma write completion\n");
297                         cb->state = RDMA_WRITE_COMPLETE;
298                         sem_post(&cb->sem);
299                         break;
300
301                 case IBV_WC_RDMA_READ:
302                         DEBUG_LOG("rdma read completion\n");
303                         cb->state = RDMA_READ_COMPLETE;
304                         sem_post(&cb->sem);
305                         break;
306
307                 case IBV_WC_RECV:
308                         DEBUG_LOG("recv completion\n");
309                         ret = cb->server ? server_recv(cb, &wc) :
310                                            client_recv(cb, &wc);
311                         if (ret) {
312                                 fprintf(stderr, "recv wc error: %d\n", ret);
313                                 goto error;
314                         }
315
316                         ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
317                         if (ret) {
318                                 fprintf(stderr, "post recv error: %d\n", ret);
319                                 goto error;
320                         }
321                         sem_post(&cb->sem);
322                         break;
323
324                 default:
325                         DEBUG_LOG("unknown!!!!! completion\n");
326                         ret = -1;
327                         goto error;
328                 }
329         }
330         if (ret) {
331                 fprintf(stderr, "poll error %d\n", ret);
332                 goto error;
333         }
334         return 0;
335
336 error:
337         cb->state = ERROR;
338         sem_post(&cb->sem);
339         return ret;
340 }
341
342 static int rping_accept(struct rping_cb *cb)
343 {
344         struct rdma_conn_param conn_param;
345         int ret;
346
347         DEBUG_LOG("accepting client connection request\n");
348
349         memset(&conn_param, 0, sizeof conn_param);
350         conn_param.responder_resources = 1;
351         conn_param.initiator_depth = 1;
352
353         ret = rdma_accept(cb->child_cm_id, &conn_param);
354         if (ret) {
355                 perror("rdma_accept");
356                 return ret;
357         }
358
359         sem_wait(&cb->sem);
360         if (cb->state == ERROR) {
361                 fprintf(stderr, "wait for CONNECTED state %d\n", cb->state);
362                 return -1;
363         }
364         return 0;
365 }
366
367 static void rping_setup_wr(struct rping_cb *cb)
368 {
369         cb->recv_sgl.addr = (uint64_t) (unsigned long) &cb->recv_buf;
370         cb->recv_sgl.length = sizeof cb->recv_buf;
371         cb->recv_sgl.lkey = cb->recv_mr->lkey;
372         cb->rq_wr.sg_list = &cb->recv_sgl;
373         cb->rq_wr.num_sge = 1;
374
375         cb->send_sgl.addr = (uint64_t) (unsigned long) &cb->send_buf;
376         cb->send_sgl.length = sizeof cb->send_buf;
377         cb->send_sgl.lkey = cb->send_mr->lkey;
378
379         cb->sq_wr.opcode = IBV_WR_SEND;
380         cb->sq_wr.send_flags = IBV_SEND_SIGNALED;
381         cb->sq_wr.sg_list = &cb->send_sgl;
382         cb->sq_wr.num_sge = 1;
383
384         cb->rdma_sgl.addr = (uint64_t) (unsigned long) cb->rdma_buf;
385         cb->rdma_sgl.lkey = cb->rdma_mr->lkey;
386         cb->rdma_sq_wr.send_flags = IBV_SEND_SIGNALED;
387         cb->rdma_sq_wr.sg_list = &cb->rdma_sgl;
388         cb->rdma_sq_wr.num_sge = 1;
389 }
390
391 static int rping_setup_buffers(struct rping_cb *cb)
392 {
393         int ret;
394
395         DEBUG_LOG("rping_setup_buffers called on cb %p\n", cb);
396
397         cb->recv_mr = ibv_reg_mr(cb->pd, &cb->recv_buf, sizeof cb->recv_buf,
398                                  IBV_ACCESS_LOCAL_WRITE);
399         if (!cb->recv_mr) {
400                 fprintf(stderr, "recv_buf reg_mr failed\n");
401                 return errno;
402         }
403
404         cb->send_mr = ibv_reg_mr(cb->pd, &cb->send_buf, sizeof cb->send_buf, 0);
405         if (!cb->send_mr) {
406                 fprintf(stderr, "send_buf reg_mr failed\n");
407                 ret = errno;
408                 goto err1;
409         }
410
411         cb->rdma_buf = malloc(cb->size);
412         if (!cb->rdma_buf) {
413                 fprintf(stderr, "rdma_buf malloc failed\n");
414                 ret = -ENOMEM;
415                 goto err2;
416         }
417
418         cb->rdma_mr = ibv_reg_mr(cb->pd, cb->rdma_buf, cb->size,
419                                  IBV_ACCESS_LOCAL_WRITE |
420                                  IBV_ACCESS_REMOTE_READ |
421                                  IBV_ACCESS_REMOTE_WRITE);
422         if (!cb->rdma_mr) {
423                 fprintf(stderr, "rdma_buf reg_mr failed\n");
424                 ret = errno;
425                 goto err3;
426         }
427
428         if (!cb->server) {
429                 cb->start_buf = malloc(cb->size);
430                 if (!cb->start_buf) {
431                         fprintf(stderr, "start_buf malloc failed\n");
432                         ret = -ENOMEM;
433                         goto err4;
434                 }
435
436                 cb->start_mr = ibv_reg_mr(cb->pd, cb->start_buf, cb->size,
437                                           IBV_ACCESS_LOCAL_WRITE | 
438                                           IBV_ACCESS_REMOTE_READ |
439                                           IBV_ACCESS_REMOTE_WRITE);
440                 if (!cb->start_mr) {
441                         fprintf(stderr, "start_buf reg_mr failed\n");
442                         ret = errno;
443                         goto err5;
444                 }
445         }
446
447         rping_setup_wr(cb);
448         DEBUG_LOG("allocated & registered buffers...\n");
449         return 0;
450
451 err5:
452         free(cb->start_buf);
453 err4:
454         ibv_dereg_mr(cb->rdma_mr);
455 err3:
456         free(cb->rdma_buf);
457 err2:
458         ibv_dereg_mr(cb->send_mr);
459 err1:
460         ibv_dereg_mr(cb->recv_mr);
461         return ret;
462 }
463
464 static void rping_free_buffers(struct rping_cb *cb)
465 {
466         DEBUG_LOG("rping_free_buffers called on cb %p\n", cb);
467         ibv_dereg_mr(cb->recv_mr);
468         ibv_dereg_mr(cb->send_mr);
469         ibv_dereg_mr(cb->rdma_mr);
470         free(cb->rdma_buf);
471         if (!cb->server) {
472                 ibv_dereg_mr(cb->start_mr);
473                 free(cb->start_buf);
474         }
475 }
476
477 static int rping_create_qp(struct rping_cb *cb)
478 {
479         struct ibv_qp_init_attr init_attr;
480         int ret;
481
482         memset(&init_attr, 0, sizeof(init_attr));
483         init_attr.cap.max_send_wr = RPING_SQ_DEPTH;
484         init_attr.cap.max_recv_wr = 2;
485         init_attr.cap.max_recv_sge = 1;
486         init_attr.cap.max_send_sge = 1;
487         init_attr.qp_type = IBV_QPT_RC;
488         init_attr.send_cq = cb->cq;
489         init_attr.recv_cq = cb->cq;
490
491         if (cb->server) {
492                 ret = rdma_create_qp(cb->child_cm_id, cb->pd, &init_attr);
493                 if (!ret)
494                         cb->qp = cb->child_cm_id->qp;
495         } else {
496                 ret = rdma_create_qp(cb->cm_id, cb->pd, &init_attr);
497                 if (!ret)
498                         cb->qp = cb->cm_id->qp;
499         }
500
501         return ret;
502 }
503
504 static void rping_free_qp(struct rping_cb *cb)
505 {
506         ibv_destroy_qp(cb->qp);
507         ibv_destroy_cq(cb->cq);
508         ibv_destroy_comp_channel(cb->channel);
509         ibv_dealloc_pd(cb->pd);
510 }
511
512 static int rping_setup_qp(struct rping_cb *cb, struct rdma_cm_id *cm_id)
513 {
514         int ret;
515
516         cb->pd = ibv_alloc_pd(cm_id->verbs);
517         if (!cb->pd) {
518                 fprintf(stderr, "ibv_alloc_pd failed\n");
519                 return errno;
520         }
521         DEBUG_LOG("created pd %p\n", cb->pd);
522
523         cb->channel = ibv_create_comp_channel(cm_id->verbs);
524         if (!cb->channel) {
525                 fprintf(stderr, "ibv_create_comp_channel failed\n");
526                 ret = errno;
527                 goto err1;
528         }
529         DEBUG_LOG("created channel %p\n", cb->channel);
530
531         cb->cq = ibv_create_cq(cm_id->verbs, RPING_SQ_DEPTH * 2, cb,
532                                 cb->channel, 0);
533         if (!cb->cq) {
534                 fprintf(stderr, "ibv_create_cq failed\n");
535                 ret = errno;
536                 goto err2;
537         }
538         DEBUG_LOG("created cq %p\n", cb->cq);
539
540         ret = ibv_req_notify_cq(cb->cq, 0);
541         if (ret) {
542                 fprintf(stderr, "ibv_create_cq failed\n");
543                 ret = errno;
544                 goto err3;
545         }
546
547         ret = rping_create_qp(cb);
548         if (ret) {
549                 perror("rdma_create_qp");
550                 goto err3;
551         }
552         DEBUG_LOG("created qp %p\n", cb->qp);
553         return 0;
554
555 err3:
556         ibv_destroy_cq(cb->cq);
557 err2:
558         ibv_destroy_comp_channel(cb->channel);
559 err1:
560         ibv_dealloc_pd(cb->pd);
561         return ret;
562 }
563
564 static void *cm_thread(void *arg)
565 {
566         struct rping_cb *cb = arg;
567         struct rdma_cm_event *event;
568         int ret;
569
570         while (1) {
571                 ret = rdma_get_cm_event(cb->cm_channel, &event);
572                 if (ret) {
573                         perror("rdma_get_cm_event");
574                         exit(ret);
575                 }
576                 ret = rping_cma_event_handler(event->id, event);
577                 rdma_ack_cm_event(event);
578                 if (ret)
579                         exit(ret);
580         }
581 }
582
583 static void *cq_thread(void *arg)
584 {
585         struct rping_cb *cb = arg;
586         struct ibv_cq *ev_cq;
587         void *ev_ctx;
588         int ret;
589         
590         DEBUG_LOG("cq_thread started.\n");
591
592         while (1) {     
593                 pthread_testcancel();
594
595                 ret = ibv_get_cq_event(cb->channel, &ev_cq, &ev_ctx);
596                 if (ret) {
597                         fprintf(stderr, "Failed to get cq event!\n");
598                         pthread_exit(NULL);
599                 }
600                 if (ev_cq != cb->cq) {
601                         fprintf(stderr, "Unknown CQ!\n");
602                         pthread_exit(NULL);
603                 }
604                 ret = ibv_req_notify_cq(cb->cq, 0);
605                 if (ret) {
606                         fprintf(stderr, "Failed to set notify!\n");
607                         pthread_exit(NULL);
608                 }
609                 ret = rping_cq_event_handler(cb);
610                 ibv_ack_cq_events(cb->cq, 1);
611                 if (ret)
612                         pthread_exit(NULL);
613         }
614 }
615
616 static void rping_format_send(struct rping_cb *cb, char *buf, struct ibv_mr *mr)
617 {
618         struct rping_rdma_info *info = &cb->send_buf;
619
620         info->buf = htonll((uint64_t) (unsigned long) buf);
621         info->rkey = htonl(mr->rkey);
622         info->size = htonl(cb->size);
623
624         DEBUG_LOG("RDMA addr %" PRIx64" rkey %x len %d\n",
625                   ntohll(info->buf), ntohl(info->rkey), ntohl(info->size));
626 }
627
628 static int rping_test_server(struct rping_cb *cb)
629 {
630         struct ibv_send_wr *bad_wr;
631         int ret;
632
633         while (1) {
634                 /* Wait for client's Start STAG/TO/Len */
635                 sem_wait(&cb->sem);
636                 if (cb->state != RDMA_READ_ADV) {
637                         fprintf(stderr, "wait for RDMA_READ_ADV state %d\n",
638                                 cb->state);
639                         ret = -1;
640                         break;
641                 }
642
643                 DEBUG_LOG("server received sink adv\n");
644
645                 /* Issue RDMA Read. */
646                 cb->rdma_sq_wr.opcode = IBV_WR_RDMA_READ;
647                 cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey;
648                 cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr;
649                 cb->rdma_sq_wr.sg_list->length = cb->remote_len;
650
651                 ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr);
652                 if (ret) {
653                         fprintf(stderr, "post send error %d\n", ret);
654                         break;
655                 }
656                 DEBUG_LOG("server posted rdma read req \n");
657
658                 /* Wait for read completion */
659                 sem_wait(&cb->sem);
660                 if (cb->state != RDMA_READ_COMPLETE) {
661                         fprintf(stderr, "wait for RDMA_READ_COMPLETE state %d\n",
662                                 cb->state);
663                         ret = -1;
664                         break;
665                 }
666                 DEBUG_LOG("server received read complete\n");
667
668                 /* Display data in recv buf */
669                 if (cb->verbose)
670                         printf("server ping data: %s\n", cb->rdma_buf);
671
672                 /* Tell client to continue */
673                 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
674                 if (ret) {
675                         fprintf(stderr, "post send error %d\n", ret);
676                         break;
677                 }
678                 DEBUG_LOG("server posted go ahead\n");
679
680                 /* Wait for client's RDMA STAG/TO/Len */
681                 sem_wait(&cb->sem);
682                 if (cb->state != RDMA_WRITE_ADV) {
683                         fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n",
684                                 cb->state);
685                         ret = -1;
686                         break;
687                 }
688                 DEBUG_LOG("server received sink adv\n");
689
690                 /* RDMA Write echo data */
691                 cb->rdma_sq_wr.opcode = IBV_WR_RDMA_WRITE;
692                 cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey;
693                 cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr;
694                 cb->rdma_sq_wr.sg_list->length = strlen(cb->rdma_buf) + 1;
695                 DEBUG_LOG("rdma write from lkey %x laddr %" PRIx64 " len %d\n",
696                           cb->rdma_sq_wr.sg_list->lkey,
697                           cb->rdma_sq_wr.sg_list->addr,
698                           cb->rdma_sq_wr.sg_list->length);
699
700                 ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr);
701                 if (ret) {
702                         fprintf(stderr, "post send error %d\n", ret);
703                         break;
704                 }
705
706                 /* Wait for completion */
707                 ret = sem_wait(&cb->sem);
708                 if (cb->state != RDMA_WRITE_COMPLETE) {
709                         fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n",
710                                 cb->state);
711                         ret = -1;
712                         break;
713                 }
714                 DEBUG_LOG("server rdma write complete \n");
715
716                 /* Tell client to begin again */
717                 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
718                 if (ret) {
719                         fprintf(stderr, "post send error %d\n", ret);
720                         break;
721                 }
722                 DEBUG_LOG("server posted go ahead\n");
723         }
724
725         return ret;
726 }
727
728 static int rping_bind_server(struct rping_cb *cb)
729 {
730         int ret;
731
732         if (cb->sin.ss_family == AF_INET)
733                 ((struct sockaddr_in *) &cb->sin)->sin_port = cb->port;
734         else
735                 ((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port;
736
737         ret = rdma_bind_addr(cb->cm_id, (struct sockaddr *) &cb->sin);
738         if (ret) {
739                 perror("rdma_bind_addr");
740                 return ret;
741         }
742         DEBUG_LOG("rdma_bind_addr successful\n");
743
744         DEBUG_LOG("rdma_listen\n");
745         ret = rdma_listen(cb->cm_id, 3);
746         if (ret) {
747                 perror("rdma_listen");
748                 return ret;
749         }
750
751         return 0;
752 }
753
754 static struct rping_cb *clone_cb(struct rping_cb *listening_cb)
755 {
756         struct rping_cb *cb = malloc(sizeof *cb);
757         if (!cb)
758                 return NULL;
759         *cb = *listening_cb;
760         cb->child_cm_id->context = cb;
761         return cb;
762 }
763
764 static void free_cb(struct rping_cb *cb)
765 {
766         free(cb);
767 }
768
769 static void *rping_persistent_server_thread(void *arg)
770 {
771         struct rping_cb *cb = arg;
772         struct ibv_recv_wr *bad_wr;
773         int ret;
774
775         ret = rping_setup_qp(cb, cb->child_cm_id);
776         if (ret) {
777                 fprintf(stderr, "setup_qp failed: %d\n", ret);
778                 goto err0;
779         }
780
781         ret = rping_setup_buffers(cb);
782         if (ret) {
783                 fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
784                 goto err1;
785         }
786
787         ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
788         if (ret) {
789                 fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
790                 goto err2;
791         }
792
793         pthread_create(&cb->cqthread, NULL, cq_thread, cb);
794
795         ret = rping_accept(cb);
796         if (ret) {
797                 fprintf(stderr, "connect error %d\n", ret);
798                 goto err3;
799         }
800
801         rping_test_server(cb);
802         rdma_disconnect(cb->child_cm_id);
803         rping_free_buffers(cb);
804         rping_free_qp(cb);
805         pthread_cancel(cb->cqthread);
806         pthread_join(cb->cqthread, NULL);
807         rdma_destroy_id(cb->child_cm_id);
808         free_cb(cb);
809         return NULL;
810 err3:
811         pthread_cancel(cb->cqthread);
812         pthread_join(cb->cqthread, NULL);
813 err2:
814         rping_free_buffers(cb);
815 err1:
816         rping_free_qp(cb);
817 err0:
818         free_cb(cb);
819         return NULL;
820 }
821
822 static int rping_run_persistent_server(struct rping_cb *listening_cb)
823 {
824         int ret;
825         struct rping_cb *cb;
826
827         ret = rping_bind_server(listening_cb);
828         if (ret)
829                 return ret;
830
831         while (1) {
832                 sem_wait(&listening_cb->sem);
833                 if (listening_cb->state != CONNECT_REQUEST) {
834                         fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
835                                 listening_cb->state);
836                         return -1;
837                 }
838
839                 cb = clone_cb(listening_cb);
840                 if (!cb)
841                         return -1;
842                 pthread_create(&cb->persistent_server_thread, NULL, rping_persistent_server_thread, cb);
843         }
844         return 0;
845 }
846
847 static int rping_run_server(struct rping_cb *cb)
848 {
849         struct ibv_recv_wr *bad_wr;
850         int ret;
851
852         ret = rping_bind_server(cb);
853         if (ret)
854                 return ret;
855
856         sem_wait(&cb->sem);
857         if (cb->state != CONNECT_REQUEST) {
858                 fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
859                         cb->state);
860                 return -1;
861         }
862
863         ret = rping_setup_qp(cb, cb->child_cm_id);
864         if (ret) {
865                 fprintf(stderr, "setup_qp failed: %d\n", ret);
866                 return ret;
867         }
868
869         ret = rping_setup_buffers(cb);
870         if (ret) {
871                 fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
872                 goto err1;
873         }
874
875         ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
876         if (ret) {
877                 fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
878                 goto err2;
879         }
880
881         pthread_create(&cb->cqthread, NULL, cq_thread, cb);
882
883         ret = rping_accept(cb);
884         if (ret) {
885                 fprintf(stderr, "connect error %d\n", ret);
886                 goto err2;
887         }
888
889         rping_test_server(cb);
890         rdma_disconnect(cb->child_cm_id);
891         rdma_destroy_id(cb->child_cm_id);
892 err2:
893         rping_free_buffers(cb);
894 err1:
895         rping_free_qp(cb);
896
897         return ret;
898 }
899
900 static int rping_test_client(struct rping_cb *cb)
901 {
902         int ping, start, cc, i, ret = 0;
903         struct ibv_send_wr *bad_wr;
904         unsigned char c;
905
906         start = 65;
907         for (ping = 0; !cb->count || ping < cb->count; ping++) {
908                 cb->state = RDMA_READ_ADV;
909
910                 /* Put some ascii text in the buffer. */
911                 cc = sprintf(cb->start_buf, RPING_MSG_FMT, ping);
912                 for (i = cc, c = start; i < cb->size; i++) {
913                         cb->start_buf[i] = c;
914                         c++;
915                         if (c > 122)
916                                 c = 65;
917                 }
918                 start++;
919                 if (start > 122)
920                         start = 65;
921                 cb->start_buf[cb->size - 1] = 0;
922
923                 rping_format_send(cb, cb->start_buf, cb->start_mr);
924                 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
925                 if (ret) {
926                         fprintf(stderr, "post send error %d\n", ret);
927                         break;
928                 }
929
930                 /* Wait for server to ACK */
931                 sem_wait(&cb->sem);
932                 if (cb->state != RDMA_WRITE_ADV) {
933                         fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n",
934                                 cb->state);
935                         ret = -1;
936                         break;
937                 }
938
939                 rping_format_send(cb, cb->rdma_buf, cb->rdma_mr);
940                 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
941                 if (ret) {
942                         fprintf(stderr, "post send error %d\n", ret);
943                         break;
944                 }
945
946                 /* Wait for the server to say the RDMA Write is complete. */
947                 sem_wait(&cb->sem);
948                 if (cb->state != RDMA_WRITE_COMPLETE) {
949                         fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n",
950                                 cb->state);
951                         ret = -1;
952                         break;
953                 }
954
955                 if (cb->validate)
956                         if (memcmp(cb->start_buf, cb->rdma_buf, cb->size)) {
957                                 fprintf(stderr, "data mismatch!\n");
958                                 ret = -1;
959                                 break;
960                         }
961
962                 if (cb->verbose)
963                         printf("ping data: %s\n", cb->rdma_buf);
964         }
965
966         return ret;
967 }
968
969 static int rping_connect_client(struct rping_cb *cb)
970 {
971         struct rdma_conn_param conn_param;
972         int ret;
973
974         memset(&conn_param, 0, sizeof conn_param);
975         conn_param.responder_resources = 1;
976         conn_param.initiator_depth = 1;
977         conn_param.retry_count = 10;
978
979         ret = rdma_connect(cb->cm_id, &conn_param);
980         if (ret) {
981                 perror("rdma_connect");
982                 return ret;
983         }
984
985         sem_wait(&cb->sem);
986         if (cb->state != CONNECTED) {
987                 fprintf(stderr, "wait for CONNECTED state %d\n", cb->state);
988                 return -1;
989         }
990
991         DEBUG_LOG("rmda_connect successful\n");
992         return 0;
993 }
994
995 static int rping_bind_client(struct rping_cb *cb)
996 {
997         int ret;
998
999         if (cb->sin.ss_family == AF_INET)
1000                 ((struct sockaddr_in *) &cb->sin)->sin_port = cb->port;
1001         else
1002                 ((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port;
1003
1004         ret = rdma_resolve_addr(cb->cm_id, NULL, (struct sockaddr *) &cb->sin, 2000);
1005         if (ret) {
1006                 perror("rdma_resolve_addr");
1007                 return ret;
1008         }
1009
1010         sem_wait(&cb->sem);
1011         if (cb->state != ROUTE_RESOLVED) {
1012                 fprintf(stderr, "waiting for addr/route resolution state %d\n",
1013                         cb->state);
1014                 return -1;
1015         }
1016
1017         DEBUG_LOG("rdma_resolve_addr - rdma_resolve_route successful\n");
1018         return 0;
1019 }
1020
1021 static int rping_run_client(struct rping_cb *cb)
1022 {
1023         struct ibv_recv_wr *bad_wr;
1024         int ret;
1025
1026         ret = rping_bind_client(cb);
1027         if (ret)
1028                 return ret;
1029
1030         ret = rping_setup_qp(cb, cb->cm_id);
1031         if (ret) {
1032                 fprintf(stderr, "setup_qp failed: %d\n", ret);
1033                 return ret;
1034         }
1035
1036         ret = rping_setup_buffers(cb);
1037         if (ret) {
1038                 fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
1039                 goto err1;
1040         }
1041
1042         ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
1043         if (ret) {
1044                 fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
1045                 goto err2;
1046         }
1047
1048         pthread_create(&cb->cqthread, NULL, cq_thread, cb);
1049
1050         ret = rping_connect_client(cb);
1051         if (ret) {
1052                 fprintf(stderr, "connect error %d\n", ret);
1053                 goto err2;
1054         }
1055
1056         rping_test_client(cb);
1057         rdma_disconnect(cb->cm_id);
1058 err2:
1059         rping_free_buffers(cb);
1060 err1:
1061         rping_free_qp(cb);
1062
1063         return ret;
1064 }
1065
1066 static int get_addr(char *dst, struct sockaddr *addr)
1067 {
1068         struct addrinfo *res;
1069         int ret;
1070
1071         ret = getaddrinfo(dst, NULL, NULL, &res);
1072         if (ret) {
1073                 printf("getaddrinfo failed - invalid hostname or IP address\n");
1074                 return ret;
1075         }
1076
1077         if (res->ai_family == PF_INET)
1078                 memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in));
1079         else if (res->ai_family == PF_INET6)
1080                 memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in6));
1081         else
1082                 ret = -1;
1083         
1084         freeaddrinfo(res);
1085         return ret;
1086 }
1087
1088 static void usage(char *name)
1089 {
1090         printf("%s -s [-vVd] [-S size] [-C count] [-a addr] [-p port]\n", 
1091                name);
1092         printf("%s -c [-vVd] [-S size] [-C count] -a addr [-p port]\n", 
1093                name);
1094         printf("\t-c\t\tclient side\n");
1095         printf("\t-s\t\tserver side.  To bind to any address with IPv6 use -a ::0\n");
1096         printf("\t-v\t\tdisplay ping data to stdout\n");
1097         printf("\t-V\t\tvalidate ping data\n");
1098         printf("\t-d\t\tdebug printfs\n");
1099         printf("\t-S size \tping data size\n");
1100         printf("\t-C count\tping count times\n");
1101         printf("\t-a addr\t\taddress\n");
1102         printf("\t-p port\t\tport\n");
1103         printf("\t-P\t\tpersistent server mode allowing multiple connections\n");
1104 }
1105
1106 int main(int argc, char *argv[])
1107 {
1108         struct rping_cb *cb;
1109         int op;
1110         int ret = 0;
1111         int persistent_server = 0;
1112
1113         cb = malloc(sizeof(*cb));
1114         if (!cb)
1115                 return -ENOMEM;
1116
1117         memset(cb, 0, sizeof(*cb));
1118         cb->server = -1;
1119         cb->state = IDLE;
1120         cb->size = 64;
1121         cb->sin.ss_family = PF_INET;
1122         cb->port = htons(7174);
1123         sem_init(&cb->sem, 0, 0);
1124
1125         opterr = 0;
1126         while ((op=getopt(argc, argv, "a:Pp:C:S:t:scvVd")) != -1) {
1127                 switch (op) {
1128                 case 'a':
1129                         ret = get_addr(optarg, (struct sockaddr *) &cb->sin);
1130                         break;
1131                 case 'P':
1132                         persistent_server = 1;
1133                         break;
1134                 case 'p':
1135                         cb->port = htons(atoi(optarg));
1136                         DEBUG_LOG("port %d\n", (int) atoi(optarg));
1137                         break;
1138                 case 's':
1139                         cb->server = 1;
1140                         DEBUG_LOG("server\n");
1141                         break;
1142                 case 'c':
1143                         cb->server = 0;
1144                         DEBUG_LOG("client\n");
1145                         break;
1146                 case 'S':
1147                         cb->size = atoi(optarg);
1148                         if ((cb->size < RPING_MIN_BUFSIZE) ||
1149                             (cb->size > (RPING_BUFSIZE - 1))) {
1150                                 fprintf(stderr, "Invalid size %d "
1151                                        "(valid range is %Zd to %d)\n",
1152                                        cb->size, RPING_MIN_BUFSIZE, RPING_BUFSIZE);
1153                                 ret = EINVAL;
1154                         } else
1155                                 DEBUG_LOG("size %d\n", (int) atoi(optarg));
1156                         break;
1157                 case 'C':
1158                         cb->count = atoi(optarg);
1159                         if (cb->count < 0) {
1160                                 fprintf(stderr, "Invalid count %d\n",
1161                                         cb->count);
1162                                 ret = EINVAL;
1163                         } else
1164                                 DEBUG_LOG("count %d\n", (int) cb->count);
1165                         break;
1166                 case 'v':
1167                         cb->verbose++;
1168                         DEBUG_LOG("verbose\n");
1169                         break;
1170                 case 'V':
1171                         cb->validate++;
1172                         DEBUG_LOG("validate data\n");
1173                         break;
1174                 case 'd':
1175                         debug++;
1176                         break;
1177                 default:
1178                         usage("rping");
1179                         ret = EINVAL;
1180                         goto out;
1181                 }
1182         }
1183         if (ret)
1184                 goto out;
1185
1186         if (cb->server == -1) {
1187                 usage("rping");
1188                 ret = EINVAL;
1189                 goto out;
1190         }
1191
1192         cb->cm_channel = rdma_create_event_channel();
1193         if (!cb->cm_channel) {
1194                 perror("rdma_create_event_channel");
1195                 goto out;
1196         }
1197
1198         ret = rdma_create_id(cb->cm_channel, &cb->cm_id, cb, RDMA_PS_TCP);
1199         if (ret) {
1200                 perror("rdma_create_id");
1201                 goto out2;
1202         }
1203         DEBUG_LOG("created cm_id %p\n", cb->cm_id);
1204
1205         pthread_create(&cb->cmthread, NULL, cm_thread, cb);
1206
1207         if (cb->server) {
1208                 if (persistent_server)
1209                         ret = rping_run_persistent_server(cb);
1210                 else
1211                         ret = rping_run_server(cb);
1212         } else
1213                 ret = rping_run_client(cb);
1214
1215         DEBUG_LOG("destroy cm_id %p\n", cb->cm_id);
1216         rdma_destroy_id(cb->cm_id);
1217 out2:
1218         rdma_destroy_event_channel(cb->cm_channel);
1219 out:
1220         free(cb);
1221         return ret;
1222 }