]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - contrib/ofed/librdmacm/examples/rping.c
Merge OpenSSL 1.0.2f.
[FreeBSD/FreeBSD.git] / contrib / ofed / librdmacm / examples / rping.c
1 /*
2  * Copyright (c) 2005 Ammasso, Inc. All rights reserved.
3  * Copyright (c) 2006 Open Grid Computing, Inc. All rights reserved.
4  *
5  * This software is available to you under a choice of one of two
6  * licenses.  You may choose to be licensed under the terms of the GNU
7  * General Public License (GPL) Version 2, available from the file
8  * COPYING in the main directory of this source tree, or the
9  * OpenIB.org BSD license below:
10  *
11  *     Redistribution and use in source and binary forms, with or
12  *     without modification, are permitted provided that the following
13  *     conditions are met:
14  *
15  *      - Redistributions of source code must retain the above
16  *        copyright notice, this list of conditions and the following
17  *        disclaimer.
18  *
19  *      - Redistributions in binary form must reproduce the above
20  *        copyright notice, this list of conditions and the following
21  *        disclaimer in the documentation and/or other materials
22  *        provided with the distribution.
23  *
24  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31  * SOFTWARE.
32  */
33
34 #include <getopt.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <stdio.h>
38 #include <errno.h>
39 #include <sys/types.h>
40 #include <netinet/in.h>
41 #include <sys/socket.h>
42 #include <netdb.h>
43 #include <semaphore.h>
44 #include <arpa/inet.h>
45 #include <pthread.h>
46 #include <inttypes.h>
47
48 #include <rdma/rdma_cma.h>
49 #include <infiniband/arch.h>
50
51 static int debug = 0;
52 #define DEBUG_LOG if (debug) printf
53
54 /*
55  * rping "ping/pong" loop:
56  *      client sends source rkey/addr/len
57  *      server receives source rkey/add/len
58  *      server rdma reads "ping" data from source
59  *      server sends "go ahead" on rdma read completion
60  *      client sends sink rkey/addr/len
61  *      server receives sink rkey/addr/len
62  *      server rdma writes "pong" data to sink
63  *      server sends "go ahead" on rdma write completion
64  *      <repeat loop>
65  */
66
67 /*
68  * These states are used to signal events between the completion handler
69  * and the main client or server thread.
70  *
71  * Once CONNECTED, they cycle through RDMA_READ_ADV, RDMA_WRITE_ADV, 
72  * and RDMA_WRITE_COMPLETE for each ping.
73  */
74 enum test_state {
75         IDLE = 1,
76         CONNECT_REQUEST,
77         ADDR_RESOLVED,
78         ROUTE_RESOLVED,
79         CONNECTED,
80         RDMA_READ_ADV,
81         RDMA_READ_COMPLETE,
82         RDMA_WRITE_ADV,
83         RDMA_WRITE_COMPLETE,
84         ERROR
85 };
86
87 struct rping_rdma_info {
88         uint64_t buf;
89         uint32_t rkey;
90         uint32_t size;
91 };
92
93 /*
94  * Default max buffer size for IO...
95  */
96 #define RPING_BUFSIZE 64*1024
97 #define RPING_SQ_DEPTH 16
98
99 /* Default string for print data and
100  * minimum buffer size
101  */
102 #define _stringify( _x ) # _x
103 #define stringify( _x ) _stringify( _x )
104
105 #define RPING_MSG_FMT           "rdma-ping-%d: "
106 #define RPING_MIN_BUFSIZE       sizeof(stringify(INT_MAX)) + sizeof(RPING_MSG_FMT)
107
108 /*
109  * Control block struct.
110  */
111 struct rping_cb {
112         int server;                     /* 0 iff client */
113         pthread_t cqthread;
114         pthread_t persistent_server_thread;
115         struct ibv_comp_channel *channel;
116         struct ibv_cq *cq;
117         struct ibv_pd *pd;
118         struct ibv_qp *qp;
119
120         struct ibv_recv_wr rq_wr;       /* recv work request record */
121         struct ibv_sge recv_sgl;        /* recv single SGE */
122         struct rping_rdma_info recv_buf;/* malloc'd buffer */
123         struct ibv_mr *recv_mr;         /* MR associated with this buffer */
124
125         struct ibv_send_wr sq_wr;       /* send work request record */
126         struct ibv_sge send_sgl;
127         struct rping_rdma_info send_buf;/* single send buf */
128         struct ibv_mr *send_mr;
129
130         struct ibv_send_wr rdma_sq_wr;  /* rdma work request record */
131         struct ibv_sge rdma_sgl;        /* rdma single SGE */
132         char *rdma_buf;                 /* used as rdma sink */
133         struct ibv_mr *rdma_mr;
134
135         uint32_t remote_rkey;           /* remote guys RKEY */
136         uint64_t remote_addr;           /* remote guys TO */
137         uint32_t remote_len;            /* remote guys LEN */
138
139         char *start_buf;                /* rdma read src */
140         struct ibv_mr *start_mr;
141
142         enum test_state state;          /* used for cond/signalling */
143         sem_t sem;
144
145         struct sockaddr_storage sin;
146         uint16_t port;                  /* dst port in NBO */
147         int verbose;                    /* verbose logging */
148         int count;                      /* ping count */
149         int size;                       /* ping data size */
150         int validate;                   /* validate ping data */
151
152         /* CM stuff */
153         pthread_t cmthread;
154         struct rdma_event_channel *cm_channel;
155         struct rdma_cm_id *cm_id;       /* connection on client side,*/
156                                         /* listener on service side. */
157         struct rdma_cm_id *child_cm_id; /* connection on server side */
158 };
159
160 static int rping_cma_event_handler(struct rdma_cm_id *cma_id,
161                                     struct rdma_cm_event *event)
162 {
163         int ret = 0;
164         struct rping_cb *cb = cma_id->context;
165
166         DEBUG_LOG("cma_event type %s cma_id %p (%s)\n",
167                   rdma_event_str(event->event), cma_id,
168                   (cma_id == cb->cm_id) ? "parent" : "child");
169
170         switch (event->event) {
171         case RDMA_CM_EVENT_ADDR_RESOLVED:
172                 cb->state = ADDR_RESOLVED;
173                 ret = rdma_resolve_route(cma_id, 2000);
174                 if (ret) {
175                         cb->state = ERROR;
176                         perror("rdma_resolve_route");
177                         sem_post(&cb->sem);
178                 }
179                 break;
180
181         case RDMA_CM_EVENT_ROUTE_RESOLVED:
182                 cb->state = ROUTE_RESOLVED;
183                 sem_post(&cb->sem);
184                 break;
185
186         case RDMA_CM_EVENT_CONNECT_REQUEST:
187                 cb->state = CONNECT_REQUEST;
188                 cb->child_cm_id = cma_id;
189                 DEBUG_LOG("child cma %p\n", cb->child_cm_id);
190                 sem_post(&cb->sem);
191                 break;
192
193         case RDMA_CM_EVENT_ESTABLISHED:
194                 DEBUG_LOG("ESTABLISHED\n");
195
196                 /*
197                  * Server will wake up when first RECV completes.
198                  */
199                 if (!cb->server) {
200                         cb->state = CONNECTED;
201                 }
202                 sem_post(&cb->sem);
203                 break;
204
205         case RDMA_CM_EVENT_ADDR_ERROR:
206         case RDMA_CM_EVENT_ROUTE_ERROR:
207         case RDMA_CM_EVENT_CONNECT_ERROR:
208         case RDMA_CM_EVENT_UNREACHABLE:
209         case RDMA_CM_EVENT_REJECTED:
210                 fprintf(stderr, "cma event %s, error %d\n",
211                         rdma_event_str(event->event), event->status);
212                 sem_post(&cb->sem);
213                 ret = -1;
214                 break;
215
216         case RDMA_CM_EVENT_DISCONNECTED:
217                 fprintf(stderr, "%s DISCONNECT EVENT...\n",
218                         cb->server ? "server" : "client");
219                 sem_post(&cb->sem);
220                 break;
221
222         case RDMA_CM_EVENT_DEVICE_REMOVAL:
223                 fprintf(stderr, "cma detected device removal!!!!\n");
224                 ret = -1;
225                 break;
226
227         default:
228                 fprintf(stderr, "unhandled event: %s, ignoring\n",
229                         rdma_event_str(event->event));
230                 break;
231         }
232
233         return ret;
234 }
235
236 static int server_recv(struct rping_cb *cb, struct ibv_wc *wc)
237 {
238         if (wc->byte_len != sizeof(cb->recv_buf)) {
239                 fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len);
240                 return -1;
241         }
242
243         cb->remote_rkey = ntohl(cb->recv_buf.rkey);
244         cb->remote_addr = ntohll(cb->recv_buf.buf);
245         cb->remote_len  = ntohl(cb->recv_buf.size);
246         DEBUG_LOG("Received rkey %x addr %" PRIx64 " len %d from peer\n",
247                   cb->remote_rkey, cb->remote_addr, cb->remote_len);
248
249         if (cb->state <= CONNECTED || cb->state == RDMA_WRITE_COMPLETE)
250                 cb->state = RDMA_READ_ADV;
251         else
252                 cb->state = RDMA_WRITE_ADV;
253
254         return 0;
255 }
256
257 static int client_recv(struct rping_cb *cb, struct ibv_wc *wc)
258 {
259         if (wc->byte_len != sizeof(cb->recv_buf)) {
260                 fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len);
261                 return -1;
262         }
263
264         if (cb->state == RDMA_READ_ADV)
265                 cb->state = RDMA_WRITE_ADV;
266         else
267                 cb->state = RDMA_WRITE_COMPLETE;
268
269         return 0;
270 }
271
272 static int rping_cq_event_handler(struct rping_cb *cb)
273 {
274         struct ibv_wc wc;
275         struct ibv_recv_wr *bad_wr;
276         int ret;
277
278         while ((ret = ibv_poll_cq(cb->cq, 1, &wc)) == 1) {
279                 ret = 0;
280
281                 if (wc.status) {
282                         if (wc.status != IBV_WC_WR_FLUSH_ERR)
283                                 fprintf(stderr, "cq completion failed status %d\n",
284                                         wc.status);
285                         ret = -1;
286                         goto error;
287                 }
288
289                 switch (wc.opcode) {
290                 case IBV_WC_SEND:
291                         DEBUG_LOG("send completion\n");
292                         break;
293
294                 case IBV_WC_RDMA_WRITE:
295                         DEBUG_LOG("rdma write completion\n");
296                         cb->state = RDMA_WRITE_COMPLETE;
297                         sem_post(&cb->sem);
298                         break;
299
300                 case IBV_WC_RDMA_READ:
301                         DEBUG_LOG("rdma read completion\n");
302                         cb->state = RDMA_READ_COMPLETE;
303                         sem_post(&cb->sem);
304                         break;
305
306                 case IBV_WC_RECV:
307                         DEBUG_LOG("recv completion\n");
308                         ret = cb->server ? server_recv(cb, &wc) :
309                                            client_recv(cb, &wc);
310                         if (ret) {
311                                 fprintf(stderr, "recv wc error: %d\n", ret);
312                                 goto error;
313                         }
314
315                         ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
316                         if (ret) {
317                                 fprintf(stderr, "post recv error: %d\n", ret);
318                                 goto error;
319                         }
320                         sem_post(&cb->sem);
321                         break;
322
323                 default:
324                         DEBUG_LOG("unknown!!!!! completion\n");
325                         ret = -1;
326                         goto error;
327                 }
328         }
329         if (ret) {
330                 fprintf(stderr, "poll error %d\n", ret);
331                 goto error;
332         }
333         return 0;
334
335 error:
336         cb->state = ERROR;
337         sem_post(&cb->sem);
338         return ret;
339 }
340
341 static int rping_accept(struct rping_cb *cb)
342 {
343         struct rdma_conn_param conn_param;
344         int ret;
345
346         DEBUG_LOG("accepting client connection request\n");
347
348         memset(&conn_param, 0, sizeof conn_param);
349         conn_param.responder_resources = 1;
350         conn_param.initiator_depth = 1;
351
352         ret = rdma_accept(cb->child_cm_id, &conn_param);
353         if (ret) {
354                 perror("rdma_accept");
355                 return ret;
356         }
357
358         sem_wait(&cb->sem);
359         if (cb->state == ERROR) {
360                 fprintf(stderr, "wait for CONNECTED state %d\n", cb->state);
361                 return -1;
362         }
363         return 0;
364 }
365
366 static void rping_setup_wr(struct rping_cb *cb)
367 {
368         cb->recv_sgl.addr = (uint64_t) (unsigned long) &cb->recv_buf;
369         cb->recv_sgl.length = sizeof cb->recv_buf;
370         cb->recv_sgl.lkey = cb->recv_mr->lkey;
371         cb->rq_wr.sg_list = &cb->recv_sgl;
372         cb->rq_wr.num_sge = 1;
373
374         cb->send_sgl.addr = (uint64_t) (unsigned long) &cb->send_buf;
375         cb->send_sgl.length = sizeof cb->send_buf;
376         cb->send_sgl.lkey = cb->send_mr->lkey;
377
378         cb->sq_wr.opcode = IBV_WR_SEND;
379         cb->sq_wr.send_flags = IBV_SEND_SIGNALED;
380         cb->sq_wr.sg_list = &cb->send_sgl;
381         cb->sq_wr.num_sge = 1;
382
383         cb->rdma_sgl.addr = (uint64_t) (unsigned long) cb->rdma_buf;
384         cb->rdma_sgl.lkey = cb->rdma_mr->lkey;
385         cb->rdma_sq_wr.send_flags = IBV_SEND_SIGNALED;
386         cb->rdma_sq_wr.sg_list = &cb->rdma_sgl;
387         cb->rdma_sq_wr.num_sge = 1;
388 }
389
390 static int rping_setup_buffers(struct rping_cb *cb)
391 {
392         int ret;
393
394         DEBUG_LOG("rping_setup_buffers called on cb %p\n", cb);
395
396         cb->recv_mr = ibv_reg_mr(cb->pd, &cb->recv_buf, sizeof cb->recv_buf,
397                                  IBV_ACCESS_LOCAL_WRITE);
398         if (!cb->recv_mr) {
399                 fprintf(stderr, "recv_buf reg_mr failed\n");
400                 return errno;
401         }
402
403         cb->send_mr = ibv_reg_mr(cb->pd, &cb->send_buf, sizeof cb->send_buf, 0);
404         if (!cb->send_mr) {
405                 fprintf(stderr, "send_buf reg_mr failed\n");
406                 ret = errno;
407                 goto err1;
408         }
409
410         cb->rdma_buf = malloc(cb->size);
411         if (!cb->rdma_buf) {
412                 fprintf(stderr, "rdma_buf malloc failed\n");
413                 ret = -ENOMEM;
414                 goto err2;
415         }
416
417         cb->rdma_mr = ibv_reg_mr(cb->pd, cb->rdma_buf, cb->size,
418                                  IBV_ACCESS_LOCAL_WRITE |
419                                  IBV_ACCESS_REMOTE_READ |
420                                  IBV_ACCESS_REMOTE_WRITE);
421         if (!cb->rdma_mr) {
422                 fprintf(stderr, "rdma_buf reg_mr failed\n");
423                 ret = errno;
424                 goto err3;
425         }
426
427         if (!cb->server) {
428                 cb->start_buf = malloc(cb->size);
429                 if (!cb->start_buf) {
430                         fprintf(stderr, "start_buf malloc failed\n");
431                         ret = -ENOMEM;
432                         goto err4;
433                 }
434
435                 cb->start_mr = ibv_reg_mr(cb->pd, cb->start_buf, cb->size,
436                                           IBV_ACCESS_LOCAL_WRITE | 
437                                           IBV_ACCESS_REMOTE_READ |
438                                           IBV_ACCESS_REMOTE_WRITE);
439                 if (!cb->start_mr) {
440                         fprintf(stderr, "start_buf reg_mr failed\n");
441                         ret = errno;
442                         goto err5;
443                 }
444         }
445
446         rping_setup_wr(cb);
447         DEBUG_LOG("allocated & registered buffers...\n");
448         return 0;
449
450 err5:
451         free(cb->start_buf);
452 err4:
453         ibv_dereg_mr(cb->rdma_mr);
454 err3:
455         free(cb->rdma_buf);
456 err2:
457         ibv_dereg_mr(cb->send_mr);
458 err1:
459         ibv_dereg_mr(cb->recv_mr);
460         return ret;
461 }
462
463 static void rping_free_buffers(struct rping_cb *cb)
464 {
465         DEBUG_LOG("rping_free_buffers called on cb %p\n", cb);
466         ibv_dereg_mr(cb->recv_mr);
467         ibv_dereg_mr(cb->send_mr);
468         ibv_dereg_mr(cb->rdma_mr);
469         free(cb->rdma_buf);
470         if (!cb->server) {
471                 ibv_dereg_mr(cb->start_mr);
472                 free(cb->start_buf);
473         }
474 }
475
476 static int rping_create_qp(struct rping_cb *cb)
477 {
478         struct ibv_qp_init_attr init_attr;
479         int ret;
480
481         memset(&init_attr, 0, sizeof(init_attr));
482         init_attr.cap.max_send_wr = RPING_SQ_DEPTH;
483         init_attr.cap.max_recv_wr = 2;
484         init_attr.cap.max_recv_sge = 1;
485         init_attr.cap.max_send_sge = 1;
486         init_attr.qp_type = IBV_QPT_RC;
487         init_attr.send_cq = cb->cq;
488         init_attr.recv_cq = cb->cq;
489
490         if (cb->server) {
491                 ret = rdma_create_qp(cb->child_cm_id, cb->pd, &init_attr);
492                 if (!ret)
493                         cb->qp = cb->child_cm_id->qp;
494         } else {
495                 ret = rdma_create_qp(cb->cm_id, cb->pd, &init_attr);
496                 if (!ret)
497                         cb->qp = cb->cm_id->qp;
498         }
499
500         return ret;
501 }
502
503 static void rping_free_qp(struct rping_cb *cb)
504 {
505         ibv_destroy_qp(cb->qp);
506         ibv_destroy_cq(cb->cq);
507         ibv_destroy_comp_channel(cb->channel);
508         ibv_dealloc_pd(cb->pd);
509 }
510
511 static int rping_setup_qp(struct rping_cb *cb, struct rdma_cm_id *cm_id)
512 {
513         int ret;
514
515         cb->pd = ibv_alloc_pd(cm_id->verbs);
516         if (!cb->pd) {
517                 fprintf(stderr, "ibv_alloc_pd failed\n");
518                 return errno;
519         }
520         DEBUG_LOG("created pd %p\n", cb->pd);
521
522         cb->channel = ibv_create_comp_channel(cm_id->verbs);
523         if (!cb->channel) {
524                 fprintf(stderr, "ibv_create_comp_channel failed\n");
525                 ret = errno;
526                 goto err1;
527         }
528         DEBUG_LOG("created channel %p\n", cb->channel);
529
530         cb->cq = ibv_create_cq(cm_id->verbs, RPING_SQ_DEPTH * 2, cb,
531                                 cb->channel, 0);
532         if (!cb->cq) {
533                 fprintf(stderr, "ibv_create_cq failed\n");
534                 ret = errno;
535                 goto err2;
536         }
537         DEBUG_LOG("created cq %p\n", cb->cq);
538
539         ret = ibv_req_notify_cq(cb->cq, 0);
540         if (ret) {
541                 fprintf(stderr, "ibv_create_cq failed\n");
542                 ret = errno;
543                 goto err3;
544         }
545
546         ret = rping_create_qp(cb);
547         if (ret) {
548                 perror("rdma_create_qp");
549                 goto err3;
550         }
551         DEBUG_LOG("created qp %p\n", cb->qp);
552         return 0;
553
554 err3:
555         ibv_destroy_cq(cb->cq);
556 err2:
557         ibv_destroy_comp_channel(cb->channel);
558 err1:
559         ibv_dealloc_pd(cb->pd);
560         return ret;
561 }
562
563 static void *cm_thread(void *arg)
564 {
565         struct rping_cb *cb = arg;
566         struct rdma_cm_event *event;
567         int ret;
568
569         while (1) {
570                 ret = rdma_get_cm_event(cb->cm_channel, &event);
571                 if (ret) {
572                         perror("rdma_get_cm_event");
573                         exit(ret);
574                 }
575                 ret = rping_cma_event_handler(event->id, event);
576                 rdma_ack_cm_event(event);
577                 if (ret)
578                         exit(ret);
579         }
580 }
581
582 static void *cq_thread(void *arg)
583 {
584         struct rping_cb *cb = arg;
585         struct ibv_cq *ev_cq;
586         void *ev_ctx;
587         int ret;
588         
589         DEBUG_LOG("cq_thread started.\n");
590
591         while (1) {     
592                 pthread_testcancel();
593
594                 ret = ibv_get_cq_event(cb->channel, &ev_cq, &ev_ctx);
595                 if (ret) {
596                         fprintf(stderr, "Failed to get cq event!\n");
597                         pthread_exit(NULL);
598                 }
599                 if (ev_cq != cb->cq) {
600                         fprintf(stderr, "Unknown CQ!\n");
601                         pthread_exit(NULL);
602                 }
603                 ret = ibv_req_notify_cq(cb->cq, 0);
604                 if (ret) {
605                         fprintf(stderr, "Failed to set notify!\n");
606                         pthread_exit(NULL);
607                 }
608                 ret = rping_cq_event_handler(cb);
609                 ibv_ack_cq_events(cb->cq, 1);
610                 if (ret)
611                         pthread_exit(NULL);
612         }
613 }
614
615 static void rping_format_send(struct rping_cb *cb, char *buf, struct ibv_mr *mr)
616 {
617         struct rping_rdma_info *info = &cb->send_buf;
618
619         info->buf = htonll((uint64_t) (unsigned long) buf);
620         info->rkey = htonl(mr->rkey);
621         info->size = htonl(cb->size);
622
623         DEBUG_LOG("RDMA addr %" PRIx64" rkey %x len %d\n",
624                   ntohll(info->buf), ntohl(info->rkey), ntohl(info->size));
625 }
626
627 static int rping_test_server(struct rping_cb *cb)
628 {
629         struct ibv_send_wr *bad_wr;
630         int ret;
631
632         while (1) {
633                 /* Wait for client's Start STAG/TO/Len */
634                 sem_wait(&cb->sem);
635                 if (cb->state != RDMA_READ_ADV) {
636                         fprintf(stderr, "wait for RDMA_READ_ADV state %d\n",
637                                 cb->state);
638                         ret = -1;
639                         break;
640                 }
641
642                 DEBUG_LOG("server received sink adv\n");
643
644                 /* Issue RDMA Read. */
645                 cb->rdma_sq_wr.opcode = IBV_WR_RDMA_READ;
646                 cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey;
647                 cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr;
648                 cb->rdma_sq_wr.sg_list->length = cb->remote_len;
649
650                 ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr);
651                 if (ret) {
652                         fprintf(stderr, "post send error %d\n", ret);
653                         break;
654                 }
655                 DEBUG_LOG("server posted rdma read req \n");
656
657                 /* Wait for read completion */
658                 sem_wait(&cb->sem);
659                 if (cb->state != RDMA_READ_COMPLETE) {
660                         fprintf(stderr, "wait for RDMA_READ_COMPLETE state %d\n",
661                                 cb->state);
662                         ret = -1;
663                         break;
664                 }
665                 DEBUG_LOG("server received read complete\n");
666
667                 /* Display data in recv buf */
668                 if (cb->verbose)
669                         printf("server ping data: %s\n", cb->rdma_buf);
670
671                 /* Tell client to continue */
672                 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
673                 if (ret) {
674                         fprintf(stderr, "post send error %d\n", ret);
675                         break;
676                 }
677                 DEBUG_LOG("server posted go ahead\n");
678
679                 /* Wait for client's RDMA STAG/TO/Len */
680                 sem_wait(&cb->sem);
681                 if (cb->state != RDMA_WRITE_ADV) {
682                         fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n",
683                                 cb->state);
684                         ret = -1;
685                         break;
686                 }
687                 DEBUG_LOG("server received sink adv\n");
688
689                 /* RDMA Write echo data */
690                 cb->rdma_sq_wr.opcode = IBV_WR_RDMA_WRITE;
691                 cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey;
692                 cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr;
693                 cb->rdma_sq_wr.sg_list->length = strlen(cb->rdma_buf) + 1;
694                 DEBUG_LOG("rdma write from lkey %x laddr %" PRIx64 " len %d\n",
695                           cb->rdma_sq_wr.sg_list->lkey,
696                           cb->rdma_sq_wr.sg_list->addr,
697                           cb->rdma_sq_wr.sg_list->length);
698
699                 ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr);
700                 if (ret) {
701                         fprintf(stderr, "post send error %d\n", ret);
702                         break;
703                 }
704
705                 /* Wait for completion */
706                 ret = sem_wait(&cb->sem);
707                 if (cb->state != RDMA_WRITE_COMPLETE) {
708                         fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n",
709                                 cb->state);
710                         ret = -1;
711                         break;
712                 }
713                 DEBUG_LOG("server rdma write complete \n");
714
715                 /* Tell client to begin again */
716                 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
717                 if (ret) {
718                         fprintf(stderr, "post send error %d\n", ret);
719                         break;
720                 }
721                 DEBUG_LOG("server posted go ahead\n");
722         }
723
724         return ret;
725 }
726
727 static int rping_bind_server(struct rping_cb *cb)
728 {
729         int ret;
730
731         if (cb->sin.ss_family == AF_INET)
732                 ((struct sockaddr_in *) &cb->sin)->sin_port = cb->port;
733         else
734                 ((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port;
735
736         ret = rdma_bind_addr(cb->cm_id, (struct sockaddr *) &cb->sin);
737         if (ret) {
738                 perror("rdma_bind_addr");
739                 return ret;
740         }
741         DEBUG_LOG("rdma_bind_addr successful\n");
742
743         DEBUG_LOG("rdma_listen\n");
744         ret = rdma_listen(cb->cm_id, 3);
745         if (ret) {
746                 perror("rdma_listen");
747                 return ret;
748         }
749
750         return 0;
751 }
752
753 static struct rping_cb *clone_cb(struct rping_cb *listening_cb)
754 {
755         struct rping_cb *cb = malloc(sizeof *cb);
756         if (!cb)
757                 return NULL;
758         *cb = *listening_cb;
759         cb->child_cm_id->context = cb;
760         return cb;
761 }
762
763 static void free_cb(struct rping_cb *cb)
764 {
765         free(cb);
766 }
767
768 static void *rping_persistent_server_thread(void *arg)
769 {
770         struct rping_cb *cb = arg;
771         struct ibv_recv_wr *bad_wr;
772         int ret;
773
774         ret = rping_setup_qp(cb, cb->child_cm_id);
775         if (ret) {
776                 fprintf(stderr, "setup_qp failed: %d\n", ret);
777                 goto err0;
778         }
779
780         ret = rping_setup_buffers(cb);
781         if (ret) {
782                 fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
783                 goto err1;
784         }
785
786         ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
787         if (ret) {
788                 fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
789                 goto err2;
790         }
791
792         pthread_create(&cb->cqthread, NULL, cq_thread, cb);
793
794         ret = rping_accept(cb);
795         if (ret) {
796                 fprintf(stderr, "connect error %d\n", ret);
797                 goto err3;
798         }
799
800         rping_test_server(cb);
801         rdma_disconnect(cb->child_cm_id);
802         pthread_join(cb->cqthread, NULL);
803         rping_free_buffers(cb);
804         rping_free_qp(cb);
805         rdma_destroy_id(cb->child_cm_id);
806         free_cb(cb);
807         return NULL;
808 err3:
809         pthread_cancel(cb->cqthread);
810         pthread_join(cb->cqthread, NULL);
811 err2:
812         rping_free_buffers(cb);
813 err1:
814         rping_free_qp(cb);
815 err0:
816         free_cb(cb);
817         return NULL;
818 }
819
820 static int rping_run_persistent_server(struct rping_cb *listening_cb)
821 {
822         int ret;
823         struct rping_cb *cb;
824
825         ret = rping_bind_server(listening_cb);
826         if (ret)
827                 return ret;
828
829         while (1) {
830                 sem_wait(&listening_cb->sem);
831                 if (listening_cb->state != CONNECT_REQUEST) {
832                         fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
833                                 listening_cb->state);
834                         return -1;
835                 }
836
837                 cb = clone_cb(listening_cb);
838                 if (!cb)
839                         return -1;
840                 pthread_create(&cb->persistent_server_thread, NULL, rping_persistent_server_thread, cb);
841         }
842         return 0;
843 }
844
845 static int rping_run_server(struct rping_cb *cb)
846 {
847         struct ibv_recv_wr *bad_wr;
848         int ret;
849
850         ret = rping_bind_server(cb);
851         if (ret)
852                 return ret;
853
854         sem_wait(&cb->sem);
855         if (cb->state != CONNECT_REQUEST) {
856                 fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
857                         cb->state);
858                 return -1;
859         }
860
861         ret = rping_setup_qp(cb, cb->child_cm_id);
862         if (ret) {
863                 fprintf(stderr, "setup_qp failed: %d\n", ret);
864                 return ret;
865         }
866
867         ret = rping_setup_buffers(cb);
868         if (ret) {
869                 fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
870                 goto err1;
871         }
872
873         ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
874         if (ret) {
875                 fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
876                 goto err2;
877         }
878
879         pthread_create(&cb->cqthread, NULL, cq_thread, cb);
880
881         ret = rping_accept(cb);
882         if (ret) {
883                 fprintf(stderr, "connect error %d\n", ret);
884                 goto err2;
885         }
886
887         rping_test_server(cb);
888         rdma_disconnect(cb->child_cm_id);
889         pthread_join(cb->cqthread, NULL);
890         rdma_destroy_id(cb->child_cm_id);
891 err2:
892         rping_free_buffers(cb);
893 err1:
894         rping_free_qp(cb);
895
896         return ret;
897 }
898
899 static int rping_test_client(struct rping_cb *cb)
900 {
901         int ping, start, cc, i, ret = 0;
902         struct ibv_send_wr *bad_wr;
903         unsigned char c;
904
905         start = 65;
906         for (ping = 0; !cb->count || ping < cb->count; ping++) {
907                 cb->state = RDMA_READ_ADV;
908
909                 /* Put some ascii text in the buffer. */
910                 cc = sprintf(cb->start_buf, RPING_MSG_FMT, ping);
911                 for (i = cc, c = start; i < cb->size; i++) {
912                         cb->start_buf[i] = c;
913                         c++;
914                         if (c > 122)
915                                 c = 65;
916                 }
917                 start++;
918                 if (start > 122)
919                         start = 65;
920                 cb->start_buf[cb->size - 1] = 0;
921
922                 rping_format_send(cb, cb->start_buf, cb->start_mr);
923                 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
924                 if (ret) {
925                         fprintf(stderr, "post send error %d\n", ret);
926                         break;
927                 }
928
929                 /* Wait for server to ACK */
930                 sem_wait(&cb->sem);
931                 if (cb->state != RDMA_WRITE_ADV) {
932                         fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n",
933                                 cb->state);
934                         ret = -1;
935                         break;
936                 }
937
938                 rping_format_send(cb, cb->rdma_buf, cb->rdma_mr);
939                 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
940                 if (ret) {
941                         fprintf(stderr, "post send error %d\n", ret);
942                         break;
943                 }
944
945                 /* Wait for the server to say the RDMA Write is complete. */
946                 sem_wait(&cb->sem);
947                 if (cb->state != RDMA_WRITE_COMPLETE) {
948                         fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n",
949                                 cb->state);
950                         ret = -1;
951                         break;
952                 }
953
954                 if (cb->validate)
955                         if (memcmp(cb->start_buf, cb->rdma_buf, cb->size)) {
956                                 fprintf(stderr, "data mismatch!\n");
957                                 ret = -1;
958                                 break;
959                         }
960
961                 if (cb->verbose)
962                         printf("ping data: %s\n", cb->rdma_buf);
963         }
964
965         return ret;
966 }
967
968 static int rping_connect_client(struct rping_cb *cb)
969 {
970         struct rdma_conn_param conn_param;
971         int ret;
972
973         memset(&conn_param, 0, sizeof conn_param);
974         conn_param.responder_resources = 1;
975         conn_param.initiator_depth = 1;
976         conn_param.retry_count = 10;
977
978         ret = rdma_connect(cb->cm_id, &conn_param);
979         if (ret) {
980                 perror("rdma_connect");
981                 return ret;
982         }
983
984         sem_wait(&cb->sem);
985         if (cb->state != CONNECTED) {
986                 fprintf(stderr, "wait for CONNECTED state %d\n", cb->state);
987                 return -1;
988         }
989
990         DEBUG_LOG("rmda_connect successful\n");
991         return 0;
992 }
993
994 static int rping_bind_client(struct rping_cb *cb)
995 {
996         int ret;
997
998         if (cb->sin.ss_family == AF_INET)
999                 ((struct sockaddr_in *) &cb->sin)->sin_port = cb->port;
1000         else
1001                 ((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port;
1002
1003         ret = rdma_resolve_addr(cb->cm_id, NULL, (struct sockaddr *) &cb->sin, 2000);
1004         if (ret) {
1005                 perror("rdma_resolve_addr");
1006                 return ret;
1007         }
1008
1009         sem_wait(&cb->sem);
1010         if (cb->state != ROUTE_RESOLVED) {
1011                 fprintf(stderr, "waiting for addr/route resolution state %d\n",
1012                         cb->state);
1013                 return -1;
1014         }
1015
1016         DEBUG_LOG("rdma_resolve_addr - rdma_resolve_route successful\n");
1017         return 0;
1018 }
1019
1020 static int rping_run_client(struct rping_cb *cb)
1021 {
1022         struct ibv_recv_wr *bad_wr;
1023         int ret;
1024
1025         ret = rping_bind_client(cb);
1026         if (ret)
1027                 return ret;
1028
1029         ret = rping_setup_qp(cb, cb->cm_id);
1030         if (ret) {
1031                 fprintf(stderr, "setup_qp failed: %d\n", ret);
1032                 return ret;
1033         }
1034
1035         ret = rping_setup_buffers(cb);
1036         if (ret) {
1037                 fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
1038                 goto err1;
1039         }
1040
1041         ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
1042         if (ret) {
1043                 fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
1044                 goto err2;
1045         }
1046
1047         pthread_create(&cb->cqthread, NULL, cq_thread, cb);
1048
1049         ret = rping_connect_client(cb);
1050         if (ret) {
1051                 fprintf(stderr, "connect error %d\n", ret);
1052                 goto err3;
1053         }
1054
1055         ret = rping_test_client(cb);
1056         if (ret) {
1057                 fprintf(stderr, "rping client failed: %d\n", ret);
1058                 goto err4;
1059         }
1060         ret = 0;
1061 err4:
1062         rdma_disconnect(cb->cm_id);
1063 err3:
1064         pthread_join(cb->cqthread, NULL);
1065 err2:
1066         rping_free_buffers(cb);
1067 err1:
1068         rping_free_qp(cb);
1069
1070         return ret;
1071 }
1072
1073 static int get_addr(char *dst, struct sockaddr *addr)
1074 {
1075         struct addrinfo *res;
1076         int ret;
1077
1078         ret = getaddrinfo(dst, NULL, NULL, &res);
1079         if (ret) {
1080                 printf("getaddrinfo failed - invalid hostname or IP address\n");
1081                 return ret;
1082         }
1083
1084         if (res->ai_family == PF_INET)
1085                 memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in));
1086         else if (res->ai_family == PF_INET6)
1087                 memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in6));
1088         else
1089                 ret = -1;
1090         
1091         freeaddrinfo(res);
1092         return ret;
1093 }
1094
1095 static void usage(char *name)
1096 {
1097         printf("%s -s [-vVd] [-S size] [-C count] [-a addr] [-p port]\n", 
1098                name);
1099         printf("%s -c [-vVd] [-S size] [-C count] -a addr [-p port]\n", 
1100                name);
1101         printf("\t-c\t\tclient side\n");
1102         printf("\t-s\t\tserver side.  To bind to any address with IPv6 use -a ::0\n");
1103         printf("\t-v\t\tdisplay ping data to stdout\n");
1104         printf("\t-V\t\tvalidate ping data\n");
1105         printf("\t-d\t\tdebug printfs\n");
1106         printf("\t-S size \tping data size\n");
1107         printf("\t-C count\tping count times\n");
1108         printf("\t-a addr\t\taddress\n");
1109         printf("\t-p port\t\tport\n");
1110         printf("\t-P\t\tpersistent server mode allowing multiple connections\n");
1111 }
1112
1113 int main(int argc, char *argv[])
1114 {
1115         struct rping_cb *cb;
1116         int op;
1117         int ret = 0;
1118         int persistent_server = 0;
1119
1120         cb = malloc(sizeof(*cb));
1121         if (!cb)
1122                 return -ENOMEM;
1123
1124         memset(cb, 0, sizeof(*cb));
1125         cb->server = -1;
1126         cb->state = IDLE;
1127         cb->size = 64;
1128         cb->sin.ss_family = PF_INET;
1129         cb->port = htons(7174);
1130         sem_init(&cb->sem, 0, 0);
1131
1132         opterr = 0;
1133         while ((op=getopt(argc, argv, "a:Pp:C:S:t:scvVd")) != -1) {
1134                 switch (op) {
1135                 case 'a':
1136                         ret = get_addr(optarg, (struct sockaddr *) &cb->sin);
1137                         break;
1138                 case 'P':
1139                         persistent_server = 1;
1140                         break;
1141                 case 'p':
1142                         cb->port = htons(atoi(optarg));
1143                         DEBUG_LOG("port %d\n", (int) atoi(optarg));
1144                         break;
1145                 case 's':
1146                         cb->server = 1;
1147                         DEBUG_LOG("server\n");
1148                         break;
1149                 case 'c':
1150                         cb->server = 0;
1151                         DEBUG_LOG("client\n");
1152                         break;
1153                 case 'S':
1154                         cb->size = atoi(optarg);
1155                         if ((cb->size < RPING_MIN_BUFSIZE) ||
1156                             (cb->size > (RPING_BUFSIZE - 1))) {
1157                                 fprintf(stderr, "Invalid size %d "
1158                                        "(valid range is %d to %d)\n",
1159                                        (int)cb->size, (int)(RPING_MIN_BUFSIZE),
1160                                        (int)(RPING_BUFSIZE));
1161                                 ret = EINVAL;
1162                         } else
1163                                 DEBUG_LOG("size %d\n", (int) atoi(optarg));
1164                         break;
1165                 case 'C':
1166                         cb->count = atoi(optarg);
1167                         if (cb->count < 0) {
1168                                 fprintf(stderr, "Invalid count %d\n",
1169                                         cb->count);
1170                                 ret = EINVAL;
1171                         } else
1172                                 DEBUG_LOG("count %d\n", (int) cb->count);
1173                         break;
1174                 case 'v':
1175                         cb->verbose++;
1176                         DEBUG_LOG("verbose\n");
1177                         break;
1178                 case 'V':
1179                         cb->validate++;
1180                         DEBUG_LOG("validate data\n");
1181                         break;
1182                 case 'd':
1183                         debug++;
1184                         break;
1185                 default:
1186                         usage("rping");
1187                         ret = EINVAL;
1188                         goto out;
1189                 }
1190         }
1191         if (ret)
1192                 goto out;
1193
1194         if (cb->server == -1) {
1195                 usage("rping");
1196                 ret = EINVAL;
1197                 goto out;
1198         }
1199
1200         cb->cm_channel = rdma_create_event_channel();
1201         if (!cb->cm_channel) {
1202                 perror("rdma_create_event_channel");
1203                 goto out;
1204         }
1205
1206         ret = rdma_create_id(cb->cm_channel, &cb->cm_id, cb, RDMA_PS_TCP);
1207         if (ret) {
1208                 perror("rdma_create_id");
1209                 goto out2;
1210         }
1211         DEBUG_LOG("created cm_id %p\n", cb->cm_id);
1212
1213         pthread_create(&cb->cmthread, NULL, cm_thread, cb);
1214
1215         if (cb->server) {
1216                 if (persistent_server)
1217                         ret = rping_run_persistent_server(cb);
1218                 else
1219                         ret = rping_run_server(cb);
1220         } else
1221                 ret = rping_run_client(cb);
1222
1223         DEBUG_LOG("destroy cm_id %p\n", cb->cm_id);
1224         rdma_destroy_id(cb->cm_id);
1225 out2:
1226         rdma_destroy_event_channel(cb->cm_channel);
1227 out:
1228         free(cb);
1229         return ret;
1230 }