]> CyberLeo.Net >> Repos - FreeBSD/releng/10.0.git/blob - sys/ofed/drivers/infiniband/ulp/sdp/sdp_rx.c
- Copy stable/10 (r259064) to releng/10.0 as part of the
[FreeBSD/releng/10.0.git] / sys / ofed / drivers / infiniband / ulp / sdp / sdp_rx.c
1 /*
2  * Copyright (c) 2009 Mellanox Technologies Ltd.  All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  */
32 #include "sdp.h"
33
34 SDP_MODPARAM_INT(rcvbuf_initial_size, 32 * 1024,
35                 "Receive buffer initial size in bytes.");
36 SDP_MODPARAM_SINT(rcvbuf_scale, 0x8,
37                 "Receive buffer size scale factor.");
38
39 /* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
40 static void
41 sdp_handle_disconn(struct sdp_sock *ssk)
42 {
43
44         sdp_dbg(ssk->socket, "%s\n", __func__);
45
46         SDP_WLOCK_ASSERT(ssk);
47         if (TCPS_HAVERCVDFIN(ssk->state) == 0)
48                 socantrcvmore(ssk->socket);
49
50         switch (ssk->state) {
51         case TCPS_SYN_RECEIVED:
52         case TCPS_ESTABLISHED:
53                 ssk->state = TCPS_CLOSE_WAIT;
54                 break;
55
56         case TCPS_FIN_WAIT_1:
57                 /* Received a reply FIN - start Infiniband tear down */
58                 sdp_dbg(ssk->socket,
59                     "%s: Starting Infiniband tear down sending DREQ\n",
60                     __func__);
61
62                 sdp_cancel_dreq_wait_timeout(ssk);
63                 ssk->qp_active = 0;
64                 if (ssk->id) {
65                         struct rdma_cm_id *id;
66
67                         id = ssk->id;
68                         SDP_WUNLOCK(ssk);
69                         rdma_disconnect(id);
70                         SDP_WLOCK(ssk);
71                 } else {
72                         sdp_warn(ssk->socket,
73                             "%s: ssk->id is NULL\n", __func__);
74                         return;
75                 }
76                 break;
77         case TCPS_TIME_WAIT:
78                 /* This is a mutual close situation and we've got the DREQ from
79                    the peer before the SDP_MID_DISCONNECT */
80                 break;
81         case TCPS_CLOSED:
82                 /* FIN arrived after IB teardown started - do nothing */
83                 sdp_dbg(ssk->socket, "%s: fin in state %s\n",
84                     __func__, sdp_state_str(ssk->state));
85                 return;
86         default:
87                 sdp_warn(ssk->socket,
88                     "%s: FIN in unexpected state. state=%d\n",
89                     __func__, ssk->state);
90                 break;
91         }
92 }
93
94 static int
95 sdp_post_recv(struct sdp_sock *ssk)
96 {
97         struct sdp_buf *rx_req;
98         int i, rc;
99         u64 addr;
100         struct ib_device *dev;
101         struct ib_recv_wr rx_wr = { NULL };
102         struct ib_sge ibsge[SDP_MAX_RECV_SGES];
103         struct ib_sge *sge = ibsge;
104         struct ib_recv_wr *bad_wr;
105         struct mbuf *mb, *m;
106         struct sdp_bsdh *h;
107         int id = ring_head(ssk->rx_ring);
108
109         /* Now, allocate and repost recv */
110         sdp_prf(ssk->socket, mb, "Posting mb");
111         mb = m_getm2(NULL, ssk->recv_bytes, M_NOWAIT, MT_DATA, M_PKTHDR);
112         if (mb == NULL) {
113                 /* Retry so we can't stall out with no memory. */
114                 if (!rx_ring_posted(ssk))
115                         queue_work(rx_comp_wq, &ssk->rx_comp_work);
116                 return -1;
117         }
118         for (m = mb; m != NULL; m = m->m_next) {
119                 m->m_len = (m->m_flags & M_EXT) ? m->m_ext.ext_size :
120                         ((m->m_flags & M_PKTHDR) ? MHLEN : MLEN);
121                 mb->m_pkthdr.len += m->m_len;
122         }
123         h = mtod(mb, struct sdp_bsdh *);
124         rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1));
125         rx_req->mb = mb;
126         dev = ssk->ib_device;
127         for (i = 0;  mb != NULL; i++, mb = mb->m_next, sge++) {
128                 addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
129                     DMA_TO_DEVICE);
130                 /* TODO: proper error handling */
131                 BUG_ON(ib_dma_mapping_error(dev, addr));
132                 BUG_ON(i >= SDP_MAX_RECV_SGES);
133                 rx_req->mapping[i] = addr;
134                 sge->addr = addr;
135                 sge->length = mb->m_len;
136                 sge->lkey = ssk->sdp_dev->mr->lkey;
137         }
138
139         rx_wr.next = NULL;
140         rx_wr.wr_id = id | SDP_OP_RECV;
141         rx_wr.sg_list = ibsge;
142         rx_wr.num_sge = i;
143         rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
144         if (unlikely(rc)) {
145                 sdp_warn(ssk->socket, "ib_post_recv failed. status %d\n", rc);
146
147                 sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
148                 m_freem(mb);
149
150                 sdp_notify(ssk, ECONNRESET);
151
152                 return -1;
153         }
154
155         atomic_inc(&ssk->rx_ring.head);
156         SDPSTATS_COUNTER_INC(post_recv);
157
158         return 0;
159 }
160
161 static inline int
162 sdp_post_recvs_needed(struct sdp_sock *ssk)
163 {
164         unsigned long bytes_in_process;
165         unsigned long max_bytes;
166         int buffer_size;
167         int posted;
168
169         if (!ssk->qp_active || !ssk->socket)
170                 return 0;
171
172         posted = rx_ring_posted(ssk);
173         if (posted >= SDP_RX_SIZE)
174                 return 0;
175         if (posted < SDP_MIN_TX_CREDITS)
176                 return 1;
177
178         buffer_size = ssk->recv_bytes;
179         max_bytes = max(ssk->socket->so_snd.sb_hiwat,
180             (1 + SDP_MIN_TX_CREDITS) * buffer_size);
181         max_bytes *= rcvbuf_scale;
182         /*
183          * Compute bytes in the receive queue and socket buffer.
184          */
185         bytes_in_process = (posted - SDP_MIN_TX_CREDITS) * buffer_size;
186         bytes_in_process += ssk->socket->so_rcv.sb_cc;
187
188         return bytes_in_process < max_bytes;
189 }
190
191 static inline void
192 sdp_post_recvs(struct sdp_sock *ssk)
193 {
194
195         while (sdp_post_recvs_needed(ssk))
196                 if (sdp_post_recv(ssk))
197                         return;
198 }
199
200 static inline struct mbuf *
201 sdp_sock_queue_rcv_mb(struct socket *sk, struct mbuf *mb)
202 {
203         struct sdp_sock *ssk = sdp_sk(sk);
204         struct sdp_bsdh *h;
205
206         h = mtod(mb, struct sdp_bsdh *);
207
208 #ifdef SDP_ZCOPY
209         SDP_SKB_CB(mb)->seq = rcv_nxt(ssk);
210         if (h->mid == SDP_MID_SRCAVAIL) {
211                 struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
212                 struct rx_srcavail_state *rx_sa;
213                 
214                 ssk->srcavail_cancel_mseq = 0;
215
216                 ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(mb) = kzalloc(
217                                 sizeof(struct rx_srcavail_state), M_NOWAIT);
218
219                 rx_sa->mseq = ntohl(h->mseq);
220                 rx_sa->used = 0;
221                 rx_sa->len = mb_len = ntohl(srcah->len);
222                 rx_sa->rkey = ntohl(srcah->rkey);
223                 rx_sa->vaddr = be64_to_cpu(srcah->vaddr);
224                 rx_sa->flags = 0;
225
226                 if (ssk->tx_sa) {
227                         sdp_dbg_data(ssk->socket, "got RX SrcAvail while waiting "
228                                         "for TX SrcAvail. waking up TX SrcAvail"
229                                         "to be aborted\n");
230                         wake_up(sk->sk_sleep);
231                 }
232
233                 atomic_add(mb->len, &ssk->rcv_nxt);
234                 sdp_dbg_data(sk, "queueing SrcAvail. mb_len = %d vaddr = %lld\n",
235                         mb_len, rx_sa->vaddr);
236         } else
237 #endif
238         {
239                 atomic_add(mb->m_pkthdr.len, &ssk->rcv_nxt);
240         }
241
242         m_adj(mb, SDP_HEAD_SIZE);
243         SOCKBUF_LOCK(&sk->so_rcv);
244         if (unlikely(h->flags & SDP_OOB_PRES))
245                 sdp_urg(ssk, mb);
246         sbappend_locked(&sk->so_rcv, mb);
247         sorwakeup_locked(sk);
248         return mb;
249 }
250
251 static int
252 sdp_get_recv_bytes(struct sdp_sock *ssk, u32 new_size)
253 {
254
255         return MIN(new_size, SDP_MAX_PACKET);
256 }
257
258 int
259 sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
260 {
261
262         ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
263         sdp_post_recvs(ssk);
264
265         return 0;
266 }
267
268 int
269 sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
270 {
271         u32 curr_size = ssk->recv_bytes;
272         u32 max_size = SDP_MAX_PACKET;
273
274         if (new_size > curr_size && new_size <= max_size) {
275                 ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
276                 return 0;
277         }
278         return -1;
279 }
280
281 static void
282 sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
283 {
284         if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
285                 ssk->recv_request_head = ring_head(ssk->rx_ring) + 1;
286         else
287                 ssk->recv_request_head = ring_tail(ssk->rx_ring);
288         ssk->recv_request = 1;
289 }
290
291 static void
292 sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
293 {
294         u32 new_size = ntohl(buf->size);
295
296         if (new_size > ssk->xmit_size_goal)
297                 ssk->xmit_size_goal = new_size;
298 }
299
300 static struct mbuf *
301 sdp_recv_completion(struct sdp_sock *ssk, int id)
302 {
303         struct sdp_buf *rx_req;
304         struct ib_device *dev;
305         struct mbuf *mb;
306
307         if (unlikely(id != ring_tail(ssk->rx_ring))) {
308                 printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
309                         id, ring_tail(ssk->rx_ring));
310                 return NULL;
311         }
312
313         dev = ssk->ib_device;
314         rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)];
315         mb = rx_req->mb;
316         sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
317
318         atomic_inc(&ssk->rx_ring.tail);
319         atomic_dec(&ssk->remote_credits);
320         return mb;
321 }
322
323 /* socket lock should be taken before calling this */
324 static int
325 sdp_process_rx_ctl_mb(struct sdp_sock *ssk, struct mbuf *mb)
326 {
327         struct sdp_bsdh *h;
328         struct socket *sk;
329
330         SDP_WLOCK_ASSERT(ssk);
331         sk = ssk->socket;
332         h = mtod(mb, struct sdp_bsdh *);
333         switch (h->mid) {
334         case SDP_MID_DATA:
335         case SDP_MID_SRCAVAIL:
336                 sdp_dbg(sk, "DATA after socket rcv was shutdown\n");
337
338                 /* got data in RCV_SHUTDOWN */
339                 if (ssk->state == TCPS_FIN_WAIT_1) {
340                         sdp_dbg(sk, "RX data when state = FIN_WAIT1\n");
341                         sdp_notify(ssk, ECONNRESET);
342                 }
343                 m_freem(mb);
344
345                 break;
346 #ifdef SDP_ZCOPY
347         case SDP_MID_RDMARDCOMPL:
348                 m_freem(mb);
349                 break;
350         case SDP_MID_SENDSM:
351                 sdp_handle_sendsm(ssk, ntohl(h->mseq_ack));
352                 m_freem(mb);
353                 break;
354         case SDP_MID_SRCAVAIL_CANCEL:
355                 sdp_dbg_data(sk, "Handling SrcAvailCancel\n");
356                 sdp_prf(sk, NULL, "Handling SrcAvailCancel");
357                 if (ssk->rx_sa) {
358                         ssk->srcavail_cancel_mseq = ntohl(h->mseq);
359                         ssk->rx_sa->flags |= RX_SA_ABORTED;
360                         ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get 
361                                               the dirty logic from recvmsg */
362                 } else {
363                         sdp_dbg(sk, "Got SrcAvailCancel - "
364                                         "but no SrcAvail in process\n");
365                 }
366                 m_freem(mb);
367                 break;
368         case SDP_MID_SINKAVAIL:
369                 sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n");
370                 sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored");
371                 /* FALLTHROUGH */
372 #endif
373         case SDP_MID_ABORT:
374                 sdp_dbg_data(sk, "Handling ABORT\n");
375                 sdp_prf(sk, NULL, "Handling ABORT");
376                 sdp_notify(ssk, ECONNRESET);
377                 m_freem(mb);
378                 break;
379         case SDP_MID_DISCONN:
380                 sdp_dbg_data(sk, "Handling DISCONN\n");
381                 sdp_prf(sk, NULL, "Handling DISCONN");
382                 sdp_handle_disconn(ssk);
383                 break;
384         case SDP_MID_CHRCVBUF:
385                 sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
386                 sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)(h+1));
387                 m_freem(mb);
388                 break;
389         case SDP_MID_CHRCVBUF_ACK:
390                 sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n");
391                 sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)(h+1));
392                 m_freem(mb);
393                 break;
394         default:
395                 /* TODO: Handle other messages */
396                 sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid);
397                 m_freem(mb);
398         }
399
400         return 0;
401 }
402
403 static int
404 sdp_process_rx_mb(struct sdp_sock *ssk, struct mbuf *mb)
405 {
406         struct socket *sk;
407         struct sdp_bsdh *h;
408         unsigned long mseq_ack;
409         int credits_before;
410
411         h = mtod(mb, struct sdp_bsdh *);
412         sk = ssk->socket;
413         /*
414          * If another thread is in so_pcbfree this may be partially torn
415          * down but no further synchronization is required as the destroying
416          * thread will wait for receive to shutdown before discarding the
417          * socket.
418          */
419         if (sk == NULL) {
420                 m_freem(mb);
421                 return 0;
422         }
423
424         SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
425
426         mseq_ack = ntohl(h->mseq_ack);
427         credits_before = tx_credits(ssk);
428         atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) +
429                         1 + ntohs(h->bufs));
430         if (mseq_ack >= ssk->nagle_last_unacked)
431                 ssk->nagle_last_unacked = 0;
432
433         sdp_prf1(ssk->socket, mb, "RX %s +%d c:%d->%d mseq:%d ack:%d\n",
434                 mid2str(h->mid), ntohs(h->bufs), credits_before,
435                 tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
436
437         if (unlikely(h->mid == SDP_MID_DATA &&
438             mb->m_pkthdr.len == SDP_HEAD_SIZE)) {
439                 /* Credit update is valid even after RCV_SHUTDOWN */
440                 m_freem(mb);
441                 return 0;
442         }
443
444         if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL) ||
445             TCPS_HAVERCVDFIN(ssk->state)) {
446                 sdp_prf(sk, NULL, "Control mb - queing to control queue");
447 #ifdef SDP_ZCOPY
448                 if (h->mid == SDP_MID_SRCAVAIL_CANCEL) {
449                         sdp_dbg_data(sk, "Got SrcAvailCancel. "
450                                         "seq: 0x%d seq_ack: 0x%d\n",
451                                         ntohl(h->mseq), ntohl(h->mseq_ack));
452                         ssk->srcavail_cancel_mseq = ntohl(h->mseq);
453                 }
454
455
456                 if (h->mid == SDP_MID_RDMARDCOMPL) {
457                         struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1);
458                         sdp_dbg_data(sk, "RdmaRdCompl message arrived\n");
459                         sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack),
460                                         ntohl(rrch->len));
461                 }
462 #endif
463                 mb->m_nextpkt = NULL;
464                 if (ssk->rx_ctl_tail)
465                         ssk->rx_ctl_tail->m_nextpkt = mb;
466                 else
467                         ssk->rx_ctl_q = mb;
468                 ssk->rx_ctl_tail = mb;
469
470                 return 0;
471         }
472
473         sdp_prf1(sk, NULL, "queueing %s mb\n", mid2str(h->mid));
474         mb = sdp_sock_queue_rcv_mb(sk, mb);
475
476
477         return 0;
478 }
479
480 /* called only from irq */
481 static struct mbuf *
482 sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
483 {
484         struct mbuf *mb;
485         struct sdp_bsdh *h;
486         struct socket *sk = ssk->socket;
487         int mseq;
488
489         mb = sdp_recv_completion(ssk, wc->wr_id);
490         if (unlikely(!mb))
491                 return NULL;
492
493         if (unlikely(wc->status)) {
494                 if (ssk->qp_active && sk) {
495                         sdp_dbg(sk, "Recv completion with error. "
496                                         "Status %d, vendor: %d\n",
497                                 wc->status, wc->vendor_err);
498                         sdp_abort(sk);
499                         ssk->qp_active = 0;
500                 }
501                 m_freem(mb);
502                 return NULL;
503         }
504
505         sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
506                         (int)wc->wr_id, wc->byte_len);
507         if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
508                 sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n",
509                                 wc->byte_len, sizeof(struct sdp_bsdh));
510                 m_freem(mb);
511                 return NULL;
512         }
513         /* Use m_adj to trim the tail of data we didn't use. */
514         m_adj(mb, -(mb->m_pkthdr.len - wc->byte_len));
515         h = mtod(mb, struct sdp_bsdh *);
516
517         SDP_DUMP_PACKET(ssk->socket, "RX", mb, h);
518
519         ssk->rx_packets++;
520         ssk->rx_bytes += mb->m_pkthdr.len;
521
522         mseq = ntohl(h->mseq);
523         atomic_set(&ssk->mseq_ack, mseq);
524         if (mseq != (int)wc->wr_id)
525                 sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n",
526                                 mseq, (int)wc->wr_id);
527
528         return mb;
529 }
530
531 /* Wakeup writers if we now have credits. */
532 static void
533 sdp_bzcopy_write_space(struct sdp_sock *ssk)
534 {
535         struct socket *sk = ssk->socket;
536
537         if (tx_credits(ssk) >= ssk->min_bufs && sk)
538                 sowwakeup(sk);
539 }
540
541 /* only from interrupt. */
542 static int
543 sdp_poll_rx_cq(struct sdp_sock *ssk)
544 {
545         struct ib_cq *cq = ssk->rx_ring.cq;
546         struct ib_wc ibwc[SDP_NUM_WC];
547         int n, i;
548         int wc_processed = 0;
549         struct mbuf *mb;
550
551         do {
552                 n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
553                 for (i = 0; i < n; ++i) {
554                         struct ib_wc *wc = &ibwc[i];
555
556                         BUG_ON(!(wc->wr_id & SDP_OP_RECV));
557                         mb = sdp_process_rx_wc(ssk, wc);
558                         if (!mb)
559                                 continue;
560
561                         sdp_process_rx_mb(ssk, mb);
562                         wc_processed++;
563                 }
564         } while (n == SDP_NUM_WC);
565
566         if (wc_processed)
567                 sdp_bzcopy_write_space(ssk);
568
569         return wc_processed;
570 }
571
572 static void
573 sdp_rx_comp_work(struct work_struct *work)
574 {
575         struct sdp_sock *ssk = container_of(work, struct sdp_sock,
576                         rx_comp_work);
577
578         sdp_prf(ssk->socket, NULL, "%s", __func__);
579
580         SDP_WLOCK(ssk);
581         if (unlikely(!ssk->qp)) {
582                 sdp_prf(ssk->socket, NULL, "qp was destroyed");
583                 goto out;
584         }
585         if (unlikely(!ssk->rx_ring.cq)) {
586                 sdp_prf(ssk->socket, NULL, "rx_ring.cq is NULL");
587                 goto out;
588         }
589
590         if (unlikely(!ssk->poll_cq)) {
591                 struct rdma_cm_id *id = ssk->id;
592                 if (id && id->qp)
593                         rdma_notify(id, IB_EVENT_COMM_EST);
594                 goto out;
595         }
596
597         sdp_do_posts(ssk);
598 out:
599         SDP_WUNLOCK(ssk);
600 }
601
602 void
603 sdp_do_posts(struct sdp_sock *ssk)
604 {
605         struct socket *sk = ssk->socket;
606         int xmit_poll_force;
607         struct mbuf *mb;
608
609         SDP_WLOCK_ASSERT(ssk);
610         if (!ssk->qp_active) {
611                 sdp_dbg(sk, "QP is deactivated\n");
612                 return;
613         }
614
615         while ((mb = ssk->rx_ctl_q)) {
616                 ssk->rx_ctl_q = mb->m_nextpkt;
617                 mb->m_nextpkt = NULL;
618                 sdp_process_rx_ctl_mb(ssk, mb);
619         }
620
621         if (ssk->state == TCPS_TIME_WAIT)
622                 return;
623
624         if (!ssk->rx_ring.cq || !ssk->tx_ring.cq)
625                 return;
626
627         sdp_post_recvs(ssk);
628
629         if (tx_ring_posted(ssk))
630                 sdp_xmit_poll(ssk, 1);
631
632         sdp_post_sends(ssk, M_NOWAIT);
633
634         xmit_poll_force = tx_credits(ssk) < SDP_MIN_TX_CREDITS;
635
636         if (credit_update_needed(ssk) || xmit_poll_force) {
637                 /* if has pending tx because run out of tx_credits - xmit it */
638                 sdp_prf(sk, NULL, "Processing to free pending sends");
639                 sdp_xmit_poll(ssk,  xmit_poll_force);
640                 sdp_prf(sk, NULL, "Sending credit update");
641                 sdp_post_sends(ssk, M_NOWAIT);
642         }
643
644 }
645
646 int
647 sdp_process_rx(struct sdp_sock *ssk)
648 {
649         int wc_processed = 0;
650         int credits_before;
651
652         if (!rx_ring_trylock(&ssk->rx_ring)) {
653                 sdp_dbg(ssk->socket, "ring destroyed. not polling it\n");
654                 return 0;
655         }
656
657         credits_before = tx_credits(ssk);
658
659         wc_processed = sdp_poll_rx_cq(ssk);
660         sdp_prf(ssk->socket, NULL, "processed %d", wc_processed);
661
662         if (wc_processed) {
663                 sdp_prf(ssk->socket, NULL, "credits:  %d -> %d",
664                                 credits_before, tx_credits(ssk));
665                 queue_work(rx_comp_wq, &ssk->rx_comp_work);
666         }
667         sdp_arm_rx_cq(ssk);
668
669         rx_ring_unlock(&ssk->rx_ring);
670
671         return (wc_processed);
672 }
673
674 static void
675 sdp_rx_irq(struct ib_cq *cq, void *cq_context)
676 {
677         struct socket *sk = cq_context;
678         struct sdp_sock *ssk = sdp_sk(sk);
679
680         if (cq != ssk->rx_ring.cq) {
681                 sdp_dbg(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq);
682                 return;
683         }
684
685         SDPSTATS_COUNTER_INC(rx_int_count);
686
687         sdp_prf(sk, NULL, "rx irq");
688
689         sdp_process_rx(ssk);
690 }
691
692 static
693 void sdp_rx_ring_purge(struct sdp_sock *ssk)
694 {
695         while (rx_ring_posted(ssk) > 0) {
696                 struct mbuf *mb;
697                 mb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
698                 if (!mb)
699                         break;
700                 m_freem(mb);
701         }
702 }
703
704 void
705 sdp_rx_ring_init(struct sdp_sock *ssk)
706 {
707         ssk->rx_ring.buffer = NULL;
708         ssk->rx_ring.destroyed = 0;
709         rw_init(&ssk->rx_ring.destroyed_lock, "sdp rx lock");
710 }
711
712 static void
713 sdp_rx_cq_event_handler(struct ib_event *event, void *data)
714 {
715 }
716
717 int
718 sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
719 {
720         struct ib_cq *rx_cq;
721         int rc = 0;
722
723
724         sdp_dbg(ssk->socket, "rx ring created");
725         INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
726         atomic_set(&ssk->rx_ring.head, 1);
727         atomic_set(&ssk->rx_ring.tail, 1);
728
729         ssk->rx_ring.buffer = kmalloc(
730                         sizeof *ssk->rx_ring.buffer * SDP_RX_SIZE, GFP_KERNEL);
731         if (!ssk->rx_ring.buffer) {
732                 sdp_warn(ssk->socket,
733                         "Unable to allocate RX Ring size %zd.\n",
734                          sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE);
735
736                 return -ENOMEM;
737         }
738
739         rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
740                           ssk->socket, SDP_RX_SIZE, IB_CQ_VECTOR_LEAST_ATTACHED);
741
742         if (IS_ERR(rx_cq)) {
743                 rc = PTR_ERR(rx_cq);
744                 sdp_warn(ssk->socket, "Unable to allocate RX CQ: %d.\n", rc);
745                 goto err_cq;
746         }
747
748         sdp_sk(ssk->socket)->rx_ring.cq = rx_cq;
749         sdp_arm_rx_cq(ssk);
750
751         return 0;
752
753 err_cq:
754         kfree(ssk->rx_ring.buffer);
755         ssk->rx_ring.buffer = NULL;
756         return rc;
757 }
758
759 void
760 sdp_rx_ring_destroy(struct sdp_sock *ssk)
761 {
762
763         cancel_work_sync(&ssk->rx_comp_work);
764         rx_ring_destroy_lock(&ssk->rx_ring);
765
766         if (ssk->rx_ring.buffer) {
767                 sdp_rx_ring_purge(ssk);
768
769                 kfree(ssk->rx_ring.buffer);
770                 ssk->rx_ring.buffer = NULL;
771         }
772
773         if (ssk->rx_ring.cq) {
774                 if (ib_destroy_cq(ssk->rx_ring.cq)) {
775                         sdp_warn(ssk->socket, "destroy cq(%p) failed\n",
776                                 ssk->rx_ring.cq);
777                 } else {
778                         ssk->rx_ring.cq = NULL;
779                 }
780         }
781
782         WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));
783 }