2 * Copyright (c) 2009 Mellanox Technologies Ltd. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
34 SDP_MODPARAM_INT(rcvbuf_initial_size, 32 * 1024,
35 "Receive buffer initial size in bytes.");
36 SDP_MODPARAM_SINT(rcvbuf_scale, 0x8,
37 "Receive buffer size scale factor.");
39 /* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
41 sdp_handle_disconn(struct sdp_sock *ssk)
44 sdp_dbg(ssk->socket, "%s\n", __func__);
46 SDP_WLOCK_ASSERT(ssk);
47 if (TCPS_HAVERCVDFIN(ssk->state) == 0)
48 socantrcvmore(ssk->socket);
51 case TCPS_SYN_RECEIVED:
52 case TCPS_ESTABLISHED:
53 ssk->state = TCPS_CLOSE_WAIT;
57 /* Received a reply FIN - start Infiniband tear down */
59 "%s: Starting Infiniband tear down sending DREQ\n",
62 sdp_cancel_dreq_wait_timeout(ssk);
65 struct rdma_cm_id *id;
73 "%s: ssk->id is NULL\n", __func__);
78 /* This is a mutual close situation and we've got the DREQ from
79 the peer before the SDP_MID_DISCONNECT */
82 /* FIN arrived after IB teardown started - do nothing */
83 sdp_dbg(ssk->socket, "%s: fin in state %s\n",
84 __func__, sdp_state_str(ssk->state));
88 "%s: FIN in unexpected state. state=%d\n",
89 __func__, ssk->state);
95 sdp_post_recv(struct sdp_sock *ssk)
97 struct sdp_buf *rx_req;
100 struct ib_device *dev;
101 struct ib_recv_wr rx_wr = { NULL };
102 struct ib_sge ibsge[SDP_MAX_RECV_SGES];
103 struct ib_sge *sge = ibsge;
104 struct ib_recv_wr *bad_wr;
107 int id = ring_head(ssk->rx_ring);
109 /* Now, allocate and repost recv */
110 sdp_prf(ssk->socket, mb, "Posting mb");
111 mb = m_getm2(NULL, ssk->recv_bytes, M_NOWAIT, MT_DATA, M_PKTHDR);
113 /* Retry so we can't stall out with no memory. */
114 if (!rx_ring_posted(ssk))
115 queue_work(rx_comp_wq, &ssk->rx_comp_work);
118 for (m = mb; m != NULL; m = m->m_next) {
119 m->m_len = (m->m_flags & M_EXT) ? m->m_ext.ext_size :
120 ((m->m_flags & M_PKTHDR) ? MHLEN : MLEN);
121 mb->m_pkthdr.len += m->m_len;
123 h = mtod(mb, struct sdp_bsdh *);
124 rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1));
126 dev = ssk->ib_device;
127 for (i = 0; mb != NULL; i++, mb = mb->m_next, sge++) {
128 addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
130 /* TODO: proper error handling */
131 BUG_ON(ib_dma_mapping_error(dev, addr));
132 BUG_ON(i >= SDP_MAX_RECV_SGES);
133 rx_req->mapping[i] = addr;
135 sge->length = mb->m_len;
136 sge->lkey = ssk->sdp_dev->mr->lkey;
140 rx_wr.wr_id = id | SDP_OP_RECV;
141 rx_wr.sg_list = ibsge;
143 rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
145 sdp_warn(ssk->socket, "ib_post_recv failed. status %d\n", rc);
147 sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
150 sdp_notify(ssk, ECONNRESET);
155 atomic_inc(&ssk->rx_ring.head);
156 SDPSTATS_COUNTER_INC(post_recv);
162 sdp_post_recvs_needed(struct sdp_sock *ssk)
164 unsigned long bytes_in_process;
165 unsigned long max_bytes;
169 if (!ssk->qp_active || !ssk->socket)
172 posted = rx_ring_posted(ssk);
173 if (posted >= SDP_RX_SIZE)
175 if (posted < SDP_MIN_TX_CREDITS)
178 buffer_size = ssk->recv_bytes;
179 max_bytes = max(ssk->socket->so_snd.sb_hiwat,
180 (1 + SDP_MIN_TX_CREDITS) * buffer_size);
181 max_bytes *= rcvbuf_scale;
183 * Compute bytes in the receive queue and socket buffer.
185 bytes_in_process = (posted - SDP_MIN_TX_CREDITS) * buffer_size;
186 bytes_in_process += ssk->socket->so_rcv.sb_cc;
188 return bytes_in_process < max_bytes;
192 sdp_post_recvs(struct sdp_sock *ssk)
195 while (sdp_post_recvs_needed(ssk))
196 if (sdp_post_recv(ssk))
200 static inline struct mbuf *
201 sdp_sock_queue_rcv_mb(struct socket *sk, struct mbuf *mb)
203 struct sdp_sock *ssk = sdp_sk(sk);
206 h = mtod(mb, struct sdp_bsdh *);
209 SDP_SKB_CB(mb)->seq = rcv_nxt(ssk);
210 if (h->mid == SDP_MID_SRCAVAIL) {
211 struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
212 struct rx_srcavail_state *rx_sa;
214 ssk->srcavail_cancel_mseq = 0;
216 ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(mb) = kzalloc(
217 sizeof(struct rx_srcavail_state), M_NOWAIT);
219 rx_sa->mseq = ntohl(h->mseq);
221 rx_sa->len = mb_len = ntohl(srcah->len);
222 rx_sa->rkey = ntohl(srcah->rkey);
223 rx_sa->vaddr = be64_to_cpu(srcah->vaddr);
227 sdp_dbg_data(ssk->socket, "got RX SrcAvail while waiting "
228 "for TX SrcAvail. waking up TX SrcAvail"
230 wake_up(sk->sk_sleep);
233 atomic_add(mb->len, &ssk->rcv_nxt);
234 sdp_dbg_data(sk, "queueing SrcAvail. mb_len = %d vaddr = %lld\n",
235 mb_len, rx_sa->vaddr);
239 atomic_add(mb->m_pkthdr.len, &ssk->rcv_nxt);
242 m_adj(mb, SDP_HEAD_SIZE);
243 SOCKBUF_LOCK(&sk->so_rcv);
244 if (unlikely(h->flags & SDP_OOB_PRES))
246 sbappend_locked(&sk->so_rcv, mb);
247 sorwakeup_locked(sk);
252 sdp_get_recv_bytes(struct sdp_sock *ssk, u32 new_size)
255 return MIN(new_size, SDP_MAX_PACKET);
259 sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
262 ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
269 sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
271 u32 curr_size = ssk->recv_bytes;
272 u32 max_size = SDP_MAX_PACKET;
274 if (new_size > curr_size && new_size <= max_size) {
275 ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
282 sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
284 if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
285 ssk->recv_request_head = ring_head(ssk->rx_ring) + 1;
287 ssk->recv_request_head = ring_tail(ssk->rx_ring);
288 ssk->recv_request = 1;
292 sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
294 u32 new_size = ntohl(buf->size);
296 if (new_size > ssk->xmit_size_goal)
297 ssk->xmit_size_goal = new_size;
301 sdp_recv_completion(struct sdp_sock *ssk, int id)
303 struct sdp_buf *rx_req;
304 struct ib_device *dev;
307 if (unlikely(id != ring_tail(ssk->rx_ring))) {
308 printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
309 id, ring_tail(ssk->rx_ring));
313 dev = ssk->ib_device;
314 rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)];
316 sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
318 atomic_inc(&ssk->rx_ring.tail);
319 atomic_dec(&ssk->remote_credits);
323 /* socket lock should be taken before calling this */
325 sdp_process_rx_ctl_mb(struct sdp_sock *ssk, struct mbuf *mb)
330 SDP_WLOCK_ASSERT(ssk);
332 h = mtod(mb, struct sdp_bsdh *);
335 case SDP_MID_SRCAVAIL:
336 sdp_dbg(sk, "DATA after socket rcv was shutdown\n");
338 /* got data in RCV_SHUTDOWN */
339 if (ssk->state == TCPS_FIN_WAIT_1) {
340 sdp_dbg(sk, "RX data when state = FIN_WAIT1\n");
341 sdp_notify(ssk, ECONNRESET);
347 case SDP_MID_RDMARDCOMPL:
351 sdp_handle_sendsm(ssk, ntohl(h->mseq_ack));
354 case SDP_MID_SRCAVAIL_CANCEL:
355 sdp_dbg_data(sk, "Handling SrcAvailCancel\n");
356 sdp_prf(sk, NULL, "Handling SrcAvailCancel");
358 ssk->srcavail_cancel_mseq = ntohl(h->mseq);
359 ssk->rx_sa->flags |= RX_SA_ABORTED;
360 ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get
361 the dirty logic from recvmsg */
363 sdp_dbg(sk, "Got SrcAvailCancel - "
364 "but no SrcAvail in process\n");
368 case SDP_MID_SINKAVAIL:
369 sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n");
370 sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored");
374 sdp_dbg_data(sk, "Handling ABORT\n");
375 sdp_prf(sk, NULL, "Handling ABORT");
376 sdp_notify(ssk, ECONNRESET);
379 case SDP_MID_DISCONN:
380 sdp_dbg_data(sk, "Handling DISCONN\n");
381 sdp_prf(sk, NULL, "Handling DISCONN");
382 sdp_handle_disconn(ssk);
384 case SDP_MID_CHRCVBUF:
385 sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
386 sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)(h+1));
389 case SDP_MID_CHRCVBUF_ACK:
390 sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n");
391 sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)(h+1));
395 /* TODO: Handle other messages */
396 sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid);
404 sdp_process_rx_mb(struct sdp_sock *ssk, struct mbuf *mb)
408 unsigned long mseq_ack;
411 h = mtod(mb, struct sdp_bsdh *);
414 * If another thread is in so_pcbfree this may be partially torn
415 * down but no further synchronization is required as the destroying
416 * thread will wait for receive to shutdown before discarding the
424 SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
426 mseq_ack = ntohl(h->mseq_ack);
427 credits_before = tx_credits(ssk);
428 atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) +
430 if (mseq_ack >= ssk->nagle_last_unacked)
431 ssk->nagle_last_unacked = 0;
433 sdp_prf1(ssk->socket, mb, "RX %s +%d c:%d->%d mseq:%d ack:%d\n",
434 mid2str(h->mid), ntohs(h->bufs), credits_before,
435 tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
437 if (unlikely(h->mid == SDP_MID_DATA &&
438 mb->m_pkthdr.len == SDP_HEAD_SIZE)) {
439 /* Credit update is valid even after RCV_SHUTDOWN */
444 if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL) ||
445 TCPS_HAVERCVDFIN(ssk->state)) {
446 sdp_prf(sk, NULL, "Control mb - queing to control queue");
448 if (h->mid == SDP_MID_SRCAVAIL_CANCEL) {
449 sdp_dbg_data(sk, "Got SrcAvailCancel. "
450 "seq: 0x%d seq_ack: 0x%d\n",
451 ntohl(h->mseq), ntohl(h->mseq_ack));
452 ssk->srcavail_cancel_mseq = ntohl(h->mseq);
456 if (h->mid == SDP_MID_RDMARDCOMPL) {
457 struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1);
458 sdp_dbg_data(sk, "RdmaRdCompl message arrived\n");
459 sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack),
463 mb->m_nextpkt = NULL;
464 if (ssk->rx_ctl_tail)
465 ssk->rx_ctl_tail->m_nextpkt = mb;
468 ssk->rx_ctl_tail = mb;
473 sdp_prf1(sk, NULL, "queueing %s mb\n", mid2str(h->mid));
474 mb = sdp_sock_queue_rcv_mb(sk, mb);
480 /* called only from irq */
482 sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
486 struct socket *sk = ssk->socket;
489 mb = sdp_recv_completion(ssk, wc->wr_id);
493 if (unlikely(wc->status)) {
494 if (ssk->qp_active && sk) {
495 sdp_dbg(sk, "Recv completion with error. "
496 "Status %d, vendor: %d\n",
497 wc->status, wc->vendor_err);
505 sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
506 (int)wc->wr_id, wc->byte_len);
507 if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
508 sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n",
509 wc->byte_len, sizeof(struct sdp_bsdh));
513 /* Use m_adj to trim the tail of data we didn't use. */
514 m_adj(mb, -(mb->m_pkthdr.len - wc->byte_len));
515 h = mtod(mb, struct sdp_bsdh *);
517 SDP_DUMP_PACKET(ssk->socket, "RX", mb, h);
520 ssk->rx_bytes += mb->m_pkthdr.len;
522 mseq = ntohl(h->mseq);
523 atomic_set(&ssk->mseq_ack, mseq);
524 if (mseq != (int)wc->wr_id)
525 sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n",
526 mseq, (int)wc->wr_id);
531 /* Wakeup writers if we now have credits. */
533 sdp_bzcopy_write_space(struct sdp_sock *ssk)
535 struct socket *sk = ssk->socket;
537 if (tx_credits(ssk) >= ssk->min_bufs && sk)
541 /* only from interrupt. */
543 sdp_poll_rx_cq(struct sdp_sock *ssk)
545 struct ib_cq *cq = ssk->rx_ring.cq;
546 struct ib_wc ibwc[SDP_NUM_WC];
548 int wc_processed = 0;
552 n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
553 for (i = 0; i < n; ++i) {
554 struct ib_wc *wc = &ibwc[i];
556 BUG_ON(!(wc->wr_id & SDP_OP_RECV));
557 mb = sdp_process_rx_wc(ssk, wc);
561 sdp_process_rx_mb(ssk, mb);
564 } while (n == SDP_NUM_WC);
567 sdp_bzcopy_write_space(ssk);
573 sdp_rx_comp_work(struct work_struct *work)
575 struct sdp_sock *ssk = container_of(work, struct sdp_sock,
578 sdp_prf(ssk->socket, NULL, "%s", __func__);
581 if (unlikely(!ssk->qp)) {
582 sdp_prf(ssk->socket, NULL, "qp was destroyed");
585 if (unlikely(!ssk->rx_ring.cq)) {
586 sdp_prf(ssk->socket, NULL, "rx_ring.cq is NULL");
590 if (unlikely(!ssk->poll_cq)) {
591 struct rdma_cm_id *id = ssk->id;
593 rdma_notify(id, IB_EVENT_COMM_EST);
603 sdp_do_posts(struct sdp_sock *ssk)
605 struct socket *sk = ssk->socket;
609 SDP_WLOCK_ASSERT(ssk);
610 if (!ssk->qp_active) {
611 sdp_dbg(sk, "QP is deactivated\n");
615 while ((mb = ssk->rx_ctl_q)) {
616 ssk->rx_ctl_q = mb->m_nextpkt;
617 mb->m_nextpkt = NULL;
618 sdp_process_rx_ctl_mb(ssk, mb);
621 if (ssk->state == TCPS_TIME_WAIT)
624 if (!ssk->rx_ring.cq || !ssk->tx_ring.cq)
629 if (tx_ring_posted(ssk))
630 sdp_xmit_poll(ssk, 1);
632 sdp_post_sends(ssk, M_NOWAIT);
634 xmit_poll_force = tx_credits(ssk) < SDP_MIN_TX_CREDITS;
636 if (credit_update_needed(ssk) || xmit_poll_force) {
637 /* if has pending tx because run out of tx_credits - xmit it */
638 sdp_prf(sk, NULL, "Processing to free pending sends");
639 sdp_xmit_poll(ssk, xmit_poll_force);
640 sdp_prf(sk, NULL, "Sending credit update");
641 sdp_post_sends(ssk, M_NOWAIT);
647 sdp_process_rx(struct sdp_sock *ssk)
649 int wc_processed = 0;
652 if (!rx_ring_trylock(&ssk->rx_ring)) {
653 sdp_dbg(ssk->socket, "ring destroyed. not polling it\n");
657 credits_before = tx_credits(ssk);
659 wc_processed = sdp_poll_rx_cq(ssk);
660 sdp_prf(ssk->socket, NULL, "processed %d", wc_processed);
663 sdp_prf(ssk->socket, NULL, "credits: %d -> %d",
664 credits_before, tx_credits(ssk));
665 queue_work(rx_comp_wq, &ssk->rx_comp_work);
669 rx_ring_unlock(&ssk->rx_ring);
671 return (wc_processed);
675 sdp_rx_irq(struct ib_cq *cq, void *cq_context)
677 struct socket *sk = cq_context;
678 struct sdp_sock *ssk = sdp_sk(sk);
680 if (cq != ssk->rx_ring.cq) {
681 sdp_dbg(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq);
685 SDPSTATS_COUNTER_INC(rx_int_count);
687 sdp_prf(sk, NULL, "rx irq");
693 void sdp_rx_ring_purge(struct sdp_sock *ssk)
695 while (rx_ring_posted(ssk) > 0) {
697 mb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
705 sdp_rx_ring_init(struct sdp_sock *ssk)
707 ssk->rx_ring.buffer = NULL;
708 ssk->rx_ring.destroyed = 0;
709 rw_init(&ssk->rx_ring.destroyed_lock, "sdp rx lock");
713 sdp_rx_cq_event_handler(struct ib_event *event, void *data)
718 sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
724 sdp_dbg(ssk->socket, "rx ring created");
725 INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
726 atomic_set(&ssk->rx_ring.head, 1);
727 atomic_set(&ssk->rx_ring.tail, 1);
729 ssk->rx_ring.buffer = kmalloc(
730 sizeof *ssk->rx_ring.buffer * SDP_RX_SIZE, GFP_KERNEL);
731 if (!ssk->rx_ring.buffer) {
732 sdp_warn(ssk->socket,
733 "Unable to allocate RX Ring size %zd.\n",
734 sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE);
739 rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
740 ssk->socket, SDP_RX_SIZE, IB_CQ_VECTOR_LEAST_ATTACHED);
744 sdp_warn(ssk->socket, "Unable to allocate RX CQ: %d.\n", rc);
748 sdp_sk(ssk->socket)->rx_ring.cq = rx_cq;
754 kfree(ssk->rx_ring.buffer);
755 ssk->rx_ring.buffer = NULL;
760 sdp_rx_ring_destroy(struct sdp_sock *ssk)
763 cancel_work_sync(&ssk->rx_comp_work);
764 rx_ring_destroy_lock(&ssk->rx_ring);
766 if (ssk->rx_ring.buffer) {
767 sdp_rx_ring_purge(ssk);
769 kfree(ssk->rx_ring.buffer);
770 ssk->rx_ring.buffer = NULL;
773 if (ssk->rx_ring.cq) {
774 if (ib_destroy_cq(ssk->rx_ring.cq)) {
775 sdp_warn(ssk->socket, "destroy cq(%p) failed\n",
778 ssk->rx_ring.cq = NULL;
782 WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));