]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/ofed/drivers/infiniband/ulp/sdp/sdp_rx.c
sdp: Use the PCB as the rx completion handler argument.
[FreeBSD/FreeBSD.git] / sys / ofed / drivers / infiniband / ulp / sdp / sdp_rx.c
1 /*
2  * Copyright (c) 2009 Mellanox Technologies Ltd.  All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  */
32 #include "sdp.h"
33
34 SDP_MODPARAM_INT(rcvbuf_initial_size, 32 * 1024,
35                 "Receive buffer initial size in bytes.");
36 SDP_MODPARAM_SINT(rcvbuf_scale, 0x8,
37                 "Receive buffer size scale factor.");
38
39 /* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
40 static void
41 sdp_handle_disconn(struct sdp_sock *ssk)
42 {
43
44         sdp_dbg(ssk->socket, "%s\n", __func__);
45
46         SDP_WLOCK_ASSERT(ssk);
47         if (TCPS_HAVERCVDFIN(ssk->state) == 0)
48                 socantrcvmore(ssk->socket);
49
50         switch (ssk->state) {
51         case TCPS_SYN_RECEIVED:
52         case TCPS_ESTABLISHED:
53                 ssk->state = TCPS_CLOSE_WAIT;
54                 break;
55
56         case TCPS_FIN_WAIT_1:
57                 /* Received a reply FIN - start Infiniband tear down */
58                 sdp_dbg(ssk->socket,
59                     "%s: Starting Infiniband tear down sending DREQ\n",
60                     __func__);
61
62                 sdp_cancel_dreq_wait_timeout(ssk);
63                 ssk->qp_active = 0;
64                 if (ssk->id) {
65                         struct rdma_cm_id *id;
66
67                         id = ssk->id;
68                         SDP_WUNLOCK(ssk);
69                         rdma_disconnect(id);
70                         SDP_WLOCK(ssk);
71                 } else {
72                         sdp_warn(ssk->socket,
73                             "%s: ssk->id is NULL\n", __func__);
74                         return;
75                 }
76                 break;
77         case TCPS_TIME_WAIT:
78                 /* This is a mutual close situation and we've got the DREQ from
79                    the peer before the SDP_MID_DISCONNECT */
80                 break;
81         case TCPS_CLOSED:
82                 /* FIN arrived after IB teardown started - do nothing */
83                 sdp_dbg(ssk->socket, "%s: fin in state %s\n",
84                     __func__, sdp_state_str(ssk->state));
85                 return;
86         default:
87                 sdp_warn(ssk->socket,
88                     "%s: FIN in unexpected state. state=%d\n",
89                     __func__, ssk->state);
90                 break;
91         }
92 }
93
94 static int
95 sdp_post_recv(struct sdp_sock *ssk)
96 {
97         struct sdp_buf *rx_req;
98         int i, rc;
99         u64 addr;
100         struct ib_device *dev;
101         struct ib_recv_wr rx_wr = { NULL };
102         struct ib_sge ibsge[SDP_MAX_RECV_SGES];
103         struct ib_sge *sge = ibsge;
104         struct ib_recv_wr *bad_wr;
105         struct mbuf *mb, *m;
106         struct sdp_bsdh *h;
107         int id = ring_head(ssk->rx_ring);
108
109         /* Now, allocate and repost recv */
110         sdp_prf(ssk->socket, mb, "Posting mb");
111         mb = m_getm2(NULL, ssk->recv_bytes, M_NOWAIT, MT_DATA, M_PKTHDR);
112         if (mb == NULL) {
113                 /* Retry so we can't stall out with no memory. */
114                 if (!rx_ring_posted(ssk))
115                         queue_work(rx_comp_wq, &ssk->rx_comp_work);
116                 return -1;
117         }
118         for (m = mb; m != NULL; m = m->m_next) {
119                 m->m_len = M_SIZE(m);
120                 mb->m_pkthdr.len += m->m_len;
121         }
122         h = mtod(mb, struct sdp_bsdh *);
123         rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1));
124         rx_req->mb = mb;
125         dev = ssk->ib_device;
126         for (i = 0;  mb != NULL; i++, mb = mb->m_next, sge++) {
127                 addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
128                     DMA_TO_DEVICE);
129                 /* TODO: proper error handling */
130                 BUG_ON(ib_dma_mapping_error(dev, addr));
131                 BUG_ON(i >= SDP_MAX_RECV_SGES);
132                 rx_req->mapping[i] = addr;
133                 sge->addr = addr;
134                 sge->length = mb->m_len;
135                 sge->lkey = ssk->sdp_dev->mr->lkey;
136         }
137
138         rx_wr.next = NULL;
139         rx_wr.wr_id = id | SDP_OP_RECV;
140         rx_wr.sg_list = ibsge;
141         rx_wr.num_sge = i;
142         rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
143         if (unlikely(rc)) {
144                 sdp_warn(ssk->socket, "ib_post_recv failed. status %d\n", rc);
145
146                 sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
147                 m_freem(mb);
148
149                 sdp_notify(ssk, ECONNRESET);
150
151                 return -1;
152         }
153
154         atomic_inc(&ssk->rx_ring.head);
155         SDPSTATS_COUNTER_INC(post_recv);
156
157         return 0;
158 }
159
160 static inline int
161 sdp_post_recvs_needed(struct sdp_sock *ssk)
162 {
163         unsigned long bytes_in_process;
164         unsigned long max_bytes;
165         int buffer_size;
166         int posted;
167
168         if (!ssk->qp_active || !ssk->socket)
169                 return 0;
170
171         posted = rx_ring_posted(ssk);
172         if (posted >= SDP_RX_SIZE)
173                 return 0;
174         if (posted < SDP_MIN_TX_CREDITS)
175                 return 1;
176
177         buffer_size = ssk->recv_bytes;
178         max_bytes = max(ssk->socket->so_snd.sb_hiwat,
179             (1 + SDP_MIN_TX_CREDITS) * buffer_size);
180         max_bytes *= rcvbuf_scale;
181         /*
182          * Compute bytes in the receive queue and socket buffer.
183          */
184         bytes_in_process = (posted - SDP_MIN_TX_CREDITS) * buffer_size;
185         bytes_in_process += sbused(&ssk->socket->so_rcv);
186
187         return bytes_in_process < max_bytes;
188 }
189
190 static inline void
191 sdp_post_recvs(struct sdp_sock *ssk)
192 {
193
194         while (sdp_post_recvs_needed(ssk))
195                 if (sdp_post_recv(ssk))
196                         return;
197 }
198
199 static inline struct mbuf *
200 sdp_sock_queue_rcv_mb(struct socket *sk, struct mbuf *mb)
201 {
202         struct sdp_sock *ssk = sdp_sk(sk);
203         struct sdp_bsdh *h;
204
205         h = mtod(mb, struct sdp_bsdh *);
206
207 #ifdef SDP_ZCOPY
208         SDP_SKB_CB(mb)->seq = rcv_nxt(ssk);
209         if (h->mid == SDP_MID_SRCAVAIL) {
210                 struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
211                 struct rx_srcavail_state *rx_sa;
212                 
213                 ssk->srcavail_cancel_mseq = 0;
214
215                 ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(mb) = kzalloc(
216                                 sizeof(struct rx_srcavail_state), M_NOWAIT);
217
218                 rx_sa->mseq = ntohl(h->mseq);
219                 rx_sa->used = 0;
220                 rx_sa->len = mb_len = ntohl(srcah->len);
221                 rx_sa->rkey = ntohl(srcah->rkey);
222                 rx_sa->vaddr = be64_to_cpu(srcah->vaddr);
223                 rx_sa->flags = 0;
224
225                 if (ssk->tx_sa) {
226                         sdp_dbg_data(ssk->socket, "got RX SrcAvail while waiting "
227                                         "for TX SrcAvail. waking up TX SrcAvail"
228                                         "to be aborted\n");
229                         wake_up(sk->sk_sleep);
230                 }
231
232                 atomic_add(mb->len, &ssk->rcv_nxt);
233                 sdp_dbg_data(sk, "queueing SrcAvail. mb_len = %d vaddr = %lld\n",
234                         mb_len, rx_sa->vaddr);
235         } else
236 #endif
237         {
238                 atomic_add(mb->m_pkthdr.len, &ssk->rcv_nxt);
239         }
240
241         m_adj(mb, SDP_HEAD_SIZE);
242         SOCKBUF_LOCK(&sk->so_rcv);
243         if (unlikely(h->flags & SDP_OOB_PRES))
244                 sdp_urg(ssk, mb);
245         sbappend_locked(&sk->so_rcv, mb, 0);
246         sorwakeup_locked(sk);
247         return mb;
248 }
249
250 static int
251 sdp_get_recv_bytes(struct sdp_sock *ssk, u32 new_size)
252 {
253
254         return MIN(new_size, SDP_MAX_PACKET);
255 }
256
257 int
258 sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
259 {
260
261         ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
262         sdp_post_recvs(ssk);
263
264         return 0;
265 }
266
267 int
268 sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
269 {
270         u32 curr_size = ssk->recv_bytes;
271         u32 max_size = SDP_MAX_PACKET;
272
273         if (new_size > curr_size && new_size <= max_size) {
274                 ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
275                 return 0;
276         }
277         return -1;
278 }
279
280 static void
281 sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
282 {
283         if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
284                 ssk->recv_request_head = ring_head(ssk->rx_ring) + 1;
285         else
286                 ssk->recv_request_head = ring_tail(ssk->rx_ring);
287         ssk->recv_request = 1;
288 }
289
290 static void
291 sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
292 {
293         u32 new_size = ntohl(buf->size);
294
295         if (new_size > ssk->xmit_size_goal)
296                 ssk->xmit_size_goal = new_size;
297 }
298
299 static struct mbuf *
300 sdp_recv_completion(struct sdp_sock *ssk, int id)
301 {
302         struct sdp_buf *rx_req;
303         struct ib_device *dev;
304         struct mbuf *mb;
305
306         if (unlikely(id != ring_tail(ssk->rx_ring))) {
307                 printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
308                         id, ring_tail(ssk->rx_ring));
309                 return NULL;
310         }
311
312         dev = ssk->ib_device;
313         rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)];
314         mb = rx_req->mb;
315         sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
316
317         atomic_inc(&ssk->rx_ring.tail);
318         atomic_dec(&ssk->remote_credits);
319         return mb;
320 }
321
322 /* socket lock should be taken before calling this */
323 static int
324 sdp_process_rx_ctl_mb(struct sdp_sock *ssk, struct mbuf *mb)
325 {
326         struct sdp_bsdh *h;
327         struct socket *sk;
328
329         SDP_WLOCK_ASSERT(ssk);
330         sk = ssk->socket;
331         h = mtod(mb, struct sdp_bsdh *);
332         switch (h->mid) {
333         case SDP_MID_DATA:
334         case SDP_MID_SRCAVAIL:
335                 sdp_dbg(sk, "DATA after socket rcv was shutdown\n");
336
337                 /* got data in RCV_SHUTDOWN */
338                 if (ssk->state == TCPS_FIN_WAIT_1) {
339                         sdp_dbg(sk, "RX data when state = FIN_WAIT1\n");
340                         sdp_notify(ssk, ECONNRESET);
341                 }
342                 m_freem(mb);
343
344                 break;
345 #ifdef SDP_ZCOPY
346         case SDP_MID_RDMARDCOMPL:
347                 m_freem(mb);
348                 break;
349         case SDP_MID_SENDSM:
350                 sdp_handle_sendsm(ssk, ntohl(h->mseq_ack));
351                 m_freem(mb);
352                 break;
353         case SDP_MID_SRCAVAIL_CANCEL:
354                 sdp_dbg_data(sk, "Handling SrcAvailCancel\n");
355                 sdp_prf(sk, NULL, "Handling SrcAvailCancel");
356                 if (ssk->rx_sa) {
357                         ssk->srcavail_cancel_mseq = ntohl(h->mseq);
358                         ssk->rx_sa->flags |= RX_SA_ABORTED;
359                         ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get 
360                                               the dirty logic from recvmsg */
361                 } else {
362                         sdp_dbg(sk, "Got SrcAvailCancel - "
363                                         "but no SrcAvail in process\n");
364                 }
365                 m_freem(mb);
366                 break;
367         case SDP_MID_SINKAVAIL:
368                 sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n");
369                 sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored");
370                 /* FALLTHROUGH */
371 #endif
372         case SDP_MID_ABORT:
373                 sdp_dbg_data(sk, "Handling ABORT\n");
374                 sdp_prf(sk, NULL, "Handling ABORT");
375                 sdp_notify(ssk, ECONNRESET);
376                 m_freem(mb);
377                 break;
378         case SDP_MID_DISCONN:
379                 sdp_dbg_data(sk, "Handling DISCONN\n");
380                 sdp_prf(sk, NULL, "Handling DISCONN");
381                 sdp_handle_disconn(ssk);
382                 break;
383         case SDP_MID_CHRCVBUF:
384                 sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
385                 sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)(h+1));
386                 m_freem(mb);
387                 break;
388         case SDP_MID_CHRCVBUF_ACK:
389                 sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n");
390                 sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)(h+1));
391                 m_freem(mb);
392                 break;
393         default:
394                 /* TODO: Handle other messages */
395                 sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid);
396                 m_freem(mb);
397         }
398
399         return 0;
400 }
401
402 static int
403 sdp_process_rx_mb(struct sdp_sock *ssk, struct mbuf *mb)
404 {
405         struct socket *sk;
406         struct sdp_bsdh *h;
407         unsigned long mseq_ack;
408         int credits_before;
409
410         h = mtod(mb, struct sdp_bsdh *);
411         sk = ssk->socket;
412         /*
413          * If another thread is in so_pcbfree this may be partially torn
414          * down but no further synchronization is required as the destroying
415          * thread will wait for receive to shutdown before discarding the
416          * socket.
417          */
418         if (sk == NULL) {
419                 m_freem(mb);
420                 return 0;
421         }
422
423         SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
424
425         mseq_ack = ntohl(h->mseq_ack);
426         credits_before = tx_credits(ssk);
427         atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) +
428                         1 + ntohs(h->bufs));
429         if (mseq_ack >= ssk->nagle_last_unacked)
430                 ssk->nagle_last_unacked = 0;
431
432         sdp_prf1(ssk->socket, mb, "RX %s +%d c:%d->%d mseq:%d ack:%d\n",
433                 mid2str(h->mid), ntohs(h->bufs), credits_before,
434                 tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
435
436         if (unlikely(h->mid == SDP_MID_DATA &&
437             mb->m_pkthdr.len == SDP_HEAD_SIZE)) {
438                 /* Credit update is valid even after RCV_SHUTDOWN */
439                 m_freem(mb);
440                 return 0;
441         }
442
443         if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL) ||
444             TCPS_HAVERCVDFIN(ssk->state)) {
445                 sdp_prf(sk, NULL, "Control mb - queing to control queue");
446 #ifdef SDP_ZCOPY
447                 if (h->mid == SDP_MID_SRCAVAIL_CANCEL) {
448                         sdp_dbg_data(sk, "Got SrcAvailCancel. "
449                                         "seq: 0x%d seq_ack: 0x%d\n",
450                                         ntohl(h->mseq), ntohl(h->mseq_ack));
451                         ssk->srcavail_cancel_mseq = ntohl(h->mseq);
452                 }
453
454
455                 if (h->mid == SDP_MID_RDMARDCOMPL) {
456                         struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1);
457                         sdp_dbg_data(sk, "RdmaRdCompl message arrived\n");
458                         sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack),
459                                         ntohl(rrch->len));
460                 }
461 #endif
462                 if (mbufq_enqueue(&ssk->rxctlq, mb) != 0)
463                         m_freem(mb);
464                 return (0);
465         }
466
467         sdp_prf1(sk, NULL, "queueing %s mb\n", mid2str(h->mid));
468         mb = sdp_sock_queue_rcv_mb(sk, mb);
469
470
471         return 0;
472 }
473
474 /* called only from irq */
475 static struct mbuf *
476 sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
477 {
478         struct mbuf *mb;
479         struct sdp_bsdh *h;
480         struct socket *sk = ssk->socket;
481         int mseq;
482
483         mb = sdp_recv_completion(ssk, wc->wr_id);
484         if (unlikely(!mb))
485                 return NULL;
486
487         if (unlikely(wc->status)) {
488                 if (ssk->qp_active && sk) {
489                         sdp_dbg(sk, "Recv completion with error. "
490                                         "Status %d, vendor: %d\n",
491                                 wc->status, wc->vendor_err);
492                         sdp_abort(sk);
493                         ssk->qp_active = 0;
494                 }
495                 m_freem(mb);
496                 return NULL;
497         }
498
499         sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
500                         (int)wc->wr_id, wc->byte_len);
501         if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
502                 sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n",
503                                 wc->byte_len, sizeof(struct sdp_bsdh));
504                 m_freem(mb);
505                 return NULL;
506         }
507         /* Use m_adj to trim the tail of data we didn't use. */
508         m_adj(mb, -(mb->m_pkthdr.len - wc->byte_len));
509         h = mtod(mb, struct sdp_bsdh *);
510
511         SDP_DUMP_PACKET(ssk->socket, "RX", mb, h);
512
513         ssk->rx_packets++;
514         ssk->rx_bytes += mb->m_pkthdr.len;
515
516         mseq = ntohl(h->mseq);
517         atomic_set(&ssk->mseq_ack, mseq);
518         if (mseq != (int)wc->wr_id)
519                 sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n",
520                                 mseq, (int)wc->wr_id);
521
522         return mb;
523 }
524
525 /* Wakeup writers if we now have credits. */
526 static void
527 sdp_bzcopy_write_space(struct sdp_sock *ssk)
528 {
529         struct socket *sk = ssk->socket;
530
531         if (tx_credits(ssk) >= ssk->min_bufs && sk)
532                 sowwakeup(sk);
533 }
534
535 /* only from interrupt. */
536 static int
537 sdp_poll_rx_cq(struct sdp_sock *ssk)
538 {
539         struct ib_cq *cq = ssk->rx_ring.cq;
540         struct ib_wc ibwc[SDP_NUM_WC];
541         int n, i;
542         int wc_processed = 0;
543         struct mbuf *mb;
544
545         do {
546                 n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
547                 for (i = 0; i < n; ++i) {
548                         struct ib_wc *wc = &ibwc[i];
549
550                         BUG_ON(!(wc->wr_id & SDP_OP_RECV));
551                         mb = sdp_process_rx_wc(ssk, wc);
552                         if (!mb)
553                                 continue;
554
555                         sdp_process_rx_mb(ssk, mb);
556                         wc_processed++;
557                 }
558         } while (n == SDP_NUM_WC);
559
560         if (wc_processed)
561                 sdp_bzcopy_write_space(ssk);
562
563         return wc_processed;
564 }
565
566 static void
567 sdp_rx_comp_work(struct work_struct *work)
568 {
569         struct sdp_sock *ssk = container_of(work, struct sdp_sock,
570                         rx_comp_work);
571
572         sdp_prf(ssk->socket, NULL, "%s", __func__);
573
574         SDP_WLOCK(ssk);
575         if (unlikely(!ssk->qp)) {
576                 sdp_prf(ssk->socket, NULL, "qp was destroyed");
577                 goto out;
578         }
579         if (unlikely(!ssk->rx_ring.cq)) {
580                 sdp_prf(ssk->socket, NULL, "rx_ring.cq is NULL");
581                 goto out;
582         }
583
584         if (unlikely(!ssk->poll_cq)) {
585                 struct rdma_cm_id *id = ssk->id;
586                 if (id && id->qp)
587                         rdma_notify(id, IB_EVENT_COMM_EST);
588                 goto out;
589         }
590
591         sdp_do_posts(ssk);
592 out:
593         SDP_WUNLOCK(ssk);
594 }
595
596 void
597 sdp_do_posts(struct sdp_sock *ssk)
598 {
599         struct socket *sk = ssk->socket;
600         int xmit_poll_force;
601         struct mbuf *mb;
602
603         SDP_WLOCK_ASSERT(ssk);
604         if (!ssk->qp_active) {
605                 sdp_dbg(sk, "QP is deactivated\n");
606                 return;
607         }
608
609         while ((mb = mbufq_dequeue(&ssk->rxctlq)) != NULL)
610                 sdp_process_rx_ctl_mb(ssk, mb);
611
612         if (ssk->state == TCPS_TIME_WAIT)
613                 return;
614
615         if (!ssk->rx_ring.cq || !ssk->tx_ring.cq)
616                 return;
617
618         sdp_post_recvs(ssk);
619
620         if (tx_ring_posted(ssk))
621                 sdp_xmit_poll(ssk, 1);
622
623         sdp_post_sends(ssk, M_NOWAIT);
624
625         xmit_poll_force = tx_credits(ssk) < SDP_MIN_TX_CREDITS;
626
627         if (credit_update_needed(ssk) || xmit_poll_force) {
628                 /* if has pending tx because run out of tx_credits - xmit it */
629                 sdp_prf(sk, NULL, "Processing to free pending sends");
630                 sdp_xmit_poll(ssk,  xmit_poll_force);
631                 sdp_prf(sk, NULL, "Sending credit update");
632                 sdp_post_sends(ssk, M_NOWAIT);
633         }
634
635 }
636
637 int
638 sdp_process_rx(struct sdp_sock *ssk)
639 {
640         int wc_processed = 0;
641         int credits_before;
642
643         if (!rx_ring_trylock(&ssk->rx_ring)) {
644                 sdp_dbg(ssk->socket, "ring destroyed. not polling it\n");
645                 return 0;
646         }
647
648         credits_before = tx_credits(ssk);
649
650         wc_processed = sdp_poll_rx_cq(ssk);
651         sdp_prf(ssk->socket, NULL, "processed %d", wc_processed);
652
653         if (wc_processed) {
654                 sdp_prf(ssk->socket, NULL, "credits:  %d -> %d",
655                                 credits_before, tx_credits(ssk));
656                 queue_work(rx_comp_wq, &ssk->rx_comp_work);
657         }
658         sdp_arm_rx_cq(ssk);
659
660         rx_ring_unlock(&ssk->rx_ring);
661
662         return (wc_processed);
663 }
664
665 static void
666 sdp_rx_irq(struct ib_cq *cq, void *cq_context)
667 {
668         struct sdp_sock *ssk;
669
670         ssk = cq_context;
671         KASSERT(cq == ssk->rx_ring.cq, ("%s: mismatched cq on %p", ssk));
672
673         SDPSTATS_COUNTER_INC(rx_int_count);
674
675         sdp_prf(sk, NULL, "rx irq");
676
677         sdp_process_rx(ssk);
678 }
679
680 static
681 void sdp_rx_ring_purge(struct sdp_sock *ssk)
682 {
683         while (rx_ring_posted(ssk) > 0) {
684                 struct mbuf *mb;
685                 mb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
686                 if (!mb)
687                         break;
688                 m_freem(mb);
689         }
690 }
691
692 void
693 sdp_rx_ring_init(struct sdp_sock *ssk)
694 {
695         ssk->rx_ring.buffer = NULL;
696         ssk->rx_ring.destroyed = 0;
697         rw_init(&ssk->rx_ring.destroyed_lock, "sdp rx lock");
698 }
699
700 static void
701 sdp_rx_cq_event_handler(struct ib_event *event, void *data)
702 {
703 }
704
705 int
706 sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
707 {
708         struct ib_cq *rx_cq;
709         int rc = 0;
710
711
712         sdp_dbg(ssk->socket, "rx ring created");
713         INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
714         atomic_set(&ssk->rx_ring.head, 1);
715         atomic_set(&ssk->rx_ring.tail, 1);
716
717         ssk->rx_ring.buffer = kmalloc(
718                         sizeof *ssk->rx_ring.buffer * SDP_RX_SIZE, GFP_KERNEL);
719         if (!ssk->rx_ring.buffer) {
720                 sdp_warn(ssk->socket,
721                         "Unable to allocate RX Ring size %zd.\n",
722                          sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE);
723
724                 return -ENOMEM;
725         }
726
727         rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
728             ssk, SDP_RX_SIZE, 0);
729
730         if (IS_ERR(rx_cq)) {
731                 rc = PTR_ERR(rx_cq);
732                 sdp_warn(ssk->socket, "Unable to allocate RX CQ: %d.\n", rc);
733                 goto err_cq;
734         }
735
736         sdp_sk(ssk->socket)->rx_ring.cq = rx_cq;
737         sdp_arm_rx_cq(ssk);
738
739         return 0;
740
741 err_cq:
742         kfree(ssk->rx_ring.buffer);
743         ssk->rx_ring.buffer = NULL;
744         return rc;
745 }
746
747 void
748 sdp_rx_ring_destroy(struct sdp_sock *ssk)
749 {
750
751         cancel_work_sync(&ssk->rx_comp_work);
752         rx_ring_destroy_lock(&ssk->rx_ring);
753
754         if (ssk->rx_ring.buffer) {
755                 sdp_rx_ring_purge(ssk);
756
757                 kfree(ssk->rx_ring.buffer);
758                 ssk->rx_ring.buffer = NULL;
759         }
760
761         if (ssk->rx_ring.cq) {
762                 if (ib_destroy_cq(ssk->rx_ring.cq)) {
763                         sdp_warn(ssk->socket, "destroy cq(%p) failed\n",
764                                 ssk->rx_ring.cq);
765                 } else {
766                         ssk->rx_ring.cq = NULL;
767                 }
768         }
769
770         WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));
771 }