2 * Copyright (c) 2009 Mellanox Technologies Ltd. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
34 SDP_MODPARAM_INT(rcvbuf_initial_size, 32 * 1024,
35 "Receive buffer initial size in bytes.");
36 SDP_MODPARAM_SINT(rcvbuf_scale, 0x8,
37 "Receive buffer size scale factor.");
39 /* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
41 sdp_handle_disconn(struct sdp_sock *ssk)
44 sdp_dbg(ssk->socket, "%s\n", __func__);
46 SDP_WLOCK_ASSERT(ssk);
47 if (TCPS_HAVERCVDFIN(ssk->state) == 0)
48 socantrcvmore(ssk->socket);
51 case TCPS_SYN_RECEIVED:
52 case TCPS_ESTABLISHED:
53 ssk->state = TCPS_CLOSE_WAIT;
57 /* Received a reply FIN - start Infiniband tear down */
59 "%s: Starting Infiniband tear down sending DREQ\n",
62 sdp_cancel_dreq_wait_timeout(ssk);
65 struct rdma_cm_id *id;
73 "%s: ssk->id is NULL\n", __func__);
78 /* This is a mutual close situation and we've got the DREQ from
79 the peer before the SDP_MID_DISCONNECT */
82 /* FIN arrived after IB teardown started - do nothing */
83 sdp_dbg(ssk->socket, "%s: fin in state %s\n",
84 __func__, sdp_state_str(ssk->state));
88 "%s: FIN in unexpected state. state=%d\n",
89 __func__, ssk->state);
95 sdp_post_recv(struct sdp_sock *ssk)
97 struct sdp_buf *rx_req;
100 struct ib_device *dev;
101 struct ib_recv_wr rx_wr = { NULL };
102 struct ib_sge ibsge[SDP_MAX_RECV_SGES];
103 struct ib_sge *sge = ibsge;
104 struct ib_recv_wr *bad_wr;
107 int id = ring_head(ssk->rx_ring);
109 /* Now, allocate and repost recv */
110 sdp_prf(ssk->socket, mb, "Posting mb");
111 mb = m_getm2(NULL, ssk->recv_bytes, M_NOWAIT, MT_DATA, M_PKTHDR);
113 /* Retry so we can't stall out with no memory. */
114 if (!rx_ring_posted(ssk))
115 queue_work(rx_comp_wq, &ssk->rx_comp_work);
118 for (m = mb; m != NULL; m = m->m_next) {
119 m->m_len = M_SIZE(m);
120 mb->m_pkthdr.len += m->m_len;
122 h = mtod(mb, struct sdp_bsdh *);
123 rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1));
125 dev = ssk->ib_device;
126 for (i = 0; mb != NULL; i++, mb = mb->m_next, sge++) {
127 addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
129 /* TODO: proper error handling */
130 BUG_ON(ib_dma_mapping_error(dev, addr));
131 BUG_ON(i >= SDP_MAX_RECV_SGES);
132 rx_req->mapping[i] = addr;
134 sge->length = mb->m_len;
135 sge->lkey = ssk->sdp_dev->mr->lkey;
139 rx_wr.wr_id = id | SDP_OP_RECV;
140 rx_wr.sg_list = ibsge;
142 rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
144 sdp_warn(ssk->socket, "ib_post_recv failed. status %d\n", rc);
146 sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
149 sdp_notify(ssk, ECONNRESET);
154 atomic_inc(&ssk->rx_ring.head);
155 SDPSTATS_COUNTER_INC(post_recv);
161 sdp_post_recvs_needed(struct sdp_sock *ssk)
163 unsigned long bytes_in_process;
164 unsigned long max_bytes;
168 if (!ssk->qp_active || !ssk->socket)
171 posted = rx_ring_posted(ssk);
172 if (posted >= SDP_RX_SIZE)
174 if (posted < SDP_MIN_TX_CREDITS)
177 buffer_size = ssk->recv_bytes;
178 max_bytes = max(ssk->socket->so_snd.sb_hiwat,
179 (1 + SDP_MIN_TX_CREDITS) * buffer_size);
180 max_bytes *= rcvbuf_scale;
182 * Compute bytes in the receive queue and socket buffer.
184 bytes_in_process = (posted - SDP_MIN_TX_CREDITS) * buffer_size;
185 bytes_in_process += sbused(&ssk->socket->so_rcv);
187 return bytes_in_process < max_bytes;
191 sdp_post_recvs(struct sdp_sock *ssk)
194 while (sdp_post_recvs_needed(ssk))
195 if (sdp_post_recv(ssk))
199 static inline struct mbuf *
200 sdp_sock_queue_rcv_mb(struct socket *sk, struct mbuf *mb)
202 struct sdp_sock *ssk = sdp_sk(sk);
205 h = mtod(mb, struct sdp_bsdh *);
208 SDP_SKB_CB(mb)->seq = rcv_nxt(ssk);
209 if (h->mid == SDP_MID_SRCAVAIL) {
210 struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
211 struct rx_srcavail_state *rx_sa;
213 ssk->srcavail_cancel_mseq = 0;
215 ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(mb) = kzalloc(
216 sizeof(struct rx_srcavail_state), M_NOWAIT);
218 rx_sa->mseq = ntohl(h->mseq);
220 rx_sa->len = mb_len = ntohl(srcah->len);
221 rx_sa->rkey = ntohl(srcah->rkey);
222 rx_sa->vaddr = be64_to_cpu(srcah->vaddr);
226 sdp_dbg_data(ssk->socket, "got RX SrcAvail while waiting "
227 "for TX SrcAvail. waking up TX SrcAvail"
229 wake_up(sk->sk_sleep);
232 atomic_add(mb->len, &ssk->rcv_nxt);
233 sdp_dbg_data(sk, "queueing SrcAvail. mb_len = %d vaddr = %lld\n",
234 mb_len, rx_sa->vaddr);
238 atomic_add(mb->m_pkthdr.len, &ssk->rcv_nxt);
241 m_adj(mb, SDP_HEAD_SIZE);
242 SOCKBUF_LOCK(&sk->so_rcv);
243 if (unlikely(h->flags & SDP_OOB_PRES))
245 sbappend_locked(&sk->so_rcv, mb, 0);
246 sorwakeup_locked(sk);
251 sdp_get_recv_bytes(struct sdp_sock *ssk, u32 new_size)
254 return MIN(new_size, SDP_MAX_PACKET);
258 sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
261 ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
268 sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
270 u32 curr_size = ssk->recv_bytes;
271 u32 max_size = SDP_MAX_PACKET;
273 if (new_size > curr_size && new_size <= max_size) {
274 ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
281 sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
283 if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
284 ssk->recv_request_head = ring_head(ssk->rx_ring) + 1;
286 ssk->recv_request_head = ring_tail(ssk->rx_ring);
287 ssk->recv_request = 1;
291 sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
293 u32 new_size = ntohl(buf->size);
295 if (new_size > ssk->xmit_size_goal)
296 ssk->xmit_size_goal = new_size;
300 sdp_recv_completion(struct sdp_sock *ssk, int id)
302 struct sdp_buf *rx_req;
303 struct ib_device *dev;
306 if (unlikely(id != ring_tail(ssk->rx_ring))) {
307 printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
308 id, ring_tail(ssk->rx_ring));
312 dev = ssk->ib_device;
313 rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)];
315 sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
317 atomic_inc(&ssk->rx_ring.tail);
318 atomic_dec(&ssk->remote_credits);
322 /* socket lock should be taken before calling this */
324 sdp_process_rx_ctl_mb(struct sdp_sock *ssk, struct mbuf *mb)
329 SDP_WLOCK_ASSERT(ssk);
331 h = mtod(mb, struct sdp_bsdh *);
334 case SDP_MID_SRCAVAIL:
335 sdp_dbg(sk, "DATA after socket rcv was shutdown\n");
337 /* got data in RCV_SHUTDOWN */
338 if (ssk->state == TCPS_FIN_WAIT_1) {
339 sdp_dbg(sk, "RX data when state = FIN_WAIT1\n");
340 sdp_notify(ssk, ECONNRESET);
346 case SDP_MID_RDMARDCOMPL:
350 sdp_handle_sendsm(ssk, ntohl(h->mseq_ack));
353 case SDP_MID_SRCAVAIL_CANCEL:
354 sdp_dbg_data(sk, "Handling SrcAvailCancel\n");
355 sdp_prf(sk, NULL, "Handling SrcAvailCancel");
357 ssk->srcavail_cancel_mseq = ntohl(h->mseq);
358 ssk->rx_sa->flags |= RX_SA_ABORTED;
359 ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get
360 the dirty logic from recvmsg */
362 sdp_dbg(sk, "Got SrcAvailCancel - "
363 "but no SrcAvail in process\n");
367 case SDP_MID_SINKAVAIL:
368 sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n");
369 sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored");
373 sdp_dbg_data(sk, "Handling ABORT\n");
374 sdp_prf(sk, NULL, "Handling ABORT");
375 sdp_notify(ssk, ECONNRESET);
378 case SDP_MID_DISCONN:
379 sdp_dbg_data(sk, "Handling DISCONN\n");
380 sdp_prf(sk, NULL, "Handling DISCONN");
381 sdp_handle_disconn(ssk);
383 case SDP_MID_CHRCVBUF:
384 sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
385 sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)(h+1));
388 case SDP_MID_CHRCVBUF_ACK:
389 sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n");
390 sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)(h+1));
394 /* TODO: Handle other messages */
395 sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid);
403 sdp_process_rx_mb(struct sdp_sock *ssk, struct mbuf *mb)
407 unsigned long mseq_ack;
410 h = mtod(mb, struct sdp_bsdh *);
413 * If another thread is in so_pcbfree this may be partially torn
414 * down but no further synchronization is required as the destroying
415 * thread will wait for receive to shutdown before discarding the
423 SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
425 mseq_ack = ntohl(h->mseq_ack);
426 credits_before = tx_credits(ssk);
427 atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) +
429 if (mseq_ack >= ssk->nagle_last_unacked)
430 ssk->nagle_last_unacked = 0;
432 sdp_prf1(ssk->socket, mb, "RX %s +%d c:%d->%d mseq:%d ack:%d\n",
433 mid2str(h->mid), ntohs(h->bufs), credits_before,
434 tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
436 if (unlikely(h->mid == SDP_MID_DATA &&
437 mb->m_pkthdr.len == SDP_HEAD_SIZE)) {
438 /* Credit update is valid even after RCV_SHUTDOWN */
443 if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL) ||
444 TCPS_HAVERCVDFIN(ssk->state)) {
445 sdp_prf(sk, NULL, "Control mb - queing to control queue");
447 if (h->mid == SDP_MID_SRCAVAIL_CANCEL) {
448 sdp_dbg_data(sk, "Got SrcAvailCancel. "
449 "seq: 0x%d seq_ack: 0x%d\n",
450 ntohl(h->mseq), ntohl(h->mseq_ack));
451 ssk->srcavail_cancel_mseq = ntohl(h->mseq);
455 if (h->mid == SDP_MID_RDMARDCOMPL) {
456 struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1);
457 sdp_dbg_data(sk, "RdmaRdCompl message arrived\n");
458 sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack),
462 if (mbufq_enqueue(&ssk->rxctlq, mb) != 0)
467 sdp_prf1(sk, NULL, "queueing %s mb\n", mid2str(h->mid));
468 mb = sdp_sock_queue_rcv_mb(sk, mb);
474 /* called only from irq */
476 sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
480 struct socket *sk = ssk->socket;
483 mb = sdp_recv_completion(ssk, wc->wr_id);
487 if (unlikely(wc->status)) {
488 if (ssk->qp_active && sk) {
489 sdp_dbg(sk, "Recv completion with error. "
490 "Status %d, vendor: %d\n",
491 wc->status, wc->vendor_err);
499 sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
500 (int)wc->wr_id, wc->byte_len);
501 if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
502 sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n",
503 wc->byte_len, sizeof(struct sdp_bsdh));
507 /* Use m_adj to trim the tail of data we didn't use. */
508 m_adj(mb, -(mb->m_pkthdr.len - wc->byte_len));
509 h = mtod(mb, struct sdp_bsdh *);
511 SDP_DUMP_PACKET(ssk->socket, "RX", mb, h);
514 ssk->rx_bytes += mb->m_pkthdr.len;
516 mseq = ntohl(h->mseq);
517 atomic_set(&ssk->mseq_ack, mseq);
518 if (mseq != (int)wc->wr_id)
519 sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n",
520 mseq, (int)wc->wr_id);
525 /* Wakeup writers if we now have credits. */
527 sdp_bzcopy_write_space(struct sdp_sock *ssk)
529 struct socket *sk = ssk->socket;
531 if (tx_credits(ssk) >= ssk->min_bufs && sk)
535 /* only from interrupt. */
537 sdp_poll_rx_cq(struct sdp_sock *ssk)
539 struct ib_cq *cq = ssk->rx_ring.cq;
540 struct ib_wc ibwc[SDP_NUM_WC];
542 int wc_processed = 0;
546 n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
547 for (i = 0; i < n; ++i) {
548 struct ib_wc *wc = &ibwc[i];
550 BUG_ON(!(wc->wr_id & SDP_OP_RECV));
551 mb = sdp_process_rx_wc(ssk, wc);
555 sdp_process_rx_mb(ssk, mb);
558 } while (n == SDP_NUM_WC);
561 sdp_bzcopy_write_space(ssk);
567 sdp_rx_comp_work(struct work_struct *work)
569 struct sdp_sock *ssk = container_of(work, struct sdp_sock,
572 sdp_prf(ssk->socket, NULL, "%s", __func__);
575 if (unlikely(!ssk->qp)) {
576 sdp_prf(ssk->socket, NULL, "qp was destroyed");
579 if (unlikely(!ssk->rx_ring.cq)) {
580 sdp_prf(ssk->socket, NULL, "rx_ring.cq is NULL");
584 if (unlikely(!ssk->poll_cq)) {
585 struct rdma_cm_id *id = ssk->id;
587 rdma_notify(id, IB_EVENT_COMM_EST);
597 sdp_do_posts(struct sdp_sock *ssk)
599 struct socket *sk = ssk->socket;
603 SDP_WLOCK_ASSERT(ssk);
604 if (!ssk->qp_active) {
605 sdp_dbg(sk, "QP is deactivated\n");
609 while ((mb = mbufq_dequeue(&ssk->rxctlq)) != NULL)
610 sdp_process_rx_ctl_mb(ssk, mb);
612 if (ssk->state == TCPS_TIME_WAIT)
615 if (!ssk->rx_ring.cq || !ssk->tx_ring.cq)
620 if (tx_ring_posted(ssk))
621 sdp_xmit_poll(ssk, 1);
623 sdp_post_sends(ssk, M_NOWAIT);
625 xmit_poll_force = tx_credits(ssk) < SDP_MIN_TX_CREDITS;
627 if (credit_update_needed(ssk) || xmit_poll_force) {
628 /* if has pending tx because run out of tx_credits - xmit it */
629 sdp_prf(sk, NULL, "Processing to free pending sends");
630 sdp_xmit_poll(ssk, xmit_poll_force);
631 sdp_prf(sk, NULL, "Sending credit update");
632 sdp_post_sends(ssk, M_NOWAIT);
638 sdp_process_rx(struct sdp_sock *ssk)
640 int wc_processed = 0;
643 if (!rx_ring_trylock(&ssk->rx_ring)) {
644 sdp_dbg(ssk->socket, "ring destroyed. not polling it\n");
648 credits_before = tx_credits(ssk);
650 wc_processed = sdp_poll_rx_cq(ssk);
651 sdp_prf(ssk->socket, NULL, "processed %d", wc_processed);
654 sdp_prf(ssk->socket, NULL, "credits: %d -> %d",
655 credits_before, tx_credits(ssk));
656 queue_work(rx_comp_wq, &ssk->rx_comp_work);
660 rx_ring_unlock(&ssk->rx_ring);
662 return (wc_processed);
666 sdp_rx_irq(struct ib_cq *cq, void *cq_context)
668 struct sdp_sock *ssk;
671 KASSERT(cq == ssk->rx_ring.cq, ("%s: mismatched cq on %p", ssk));
673 SDPSTATS_COUNTER_INC(rx_int_count);
675 sdp_prf(sk, NULL, "rx irq");
681 void sdp_rx_ring_purge(struct sdp_sock *ssk)
683 while (rx_ring_posted(ssk) > 0) {
685 mb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
693 sdp_rx_ring_init(struct sdp_sock *ssk)
695 ssk->rx_ring.buffer = NULL;
696 ssk->rx_ring.destroyed = 0;
697 rw_init(&ssk->rx_ring.destroyed_lock, "sdp rx lock");
701 sdp_rx_cq_event_handler(struct ib_event *event, void *data)
706 sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
712 sdp_dbg(ssk->socket, "rx ring created");
713 INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
714 atomic_set(&ssk->rx_ring.head, 1);
715 atomic_set(&ssk->rx_ring.tail, 1);
717 ssk->rx_ring.buffer = kmalloc(
718 sizeof *ssk->rx_ring.buffer * SDP_RX_SIZE, GFP_KERNEL);
719 if (!ssk->rx_ring.buffer) {
720 sdp_warn(ssk->socket,
721 "Unable to allocate RX Ring size %zd.\n",
722 sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE);
727 rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
728 ssk, SDP_RX_SIZE, 0);
732 sdp_warn(ssk->socket, "Unable to allocate RX CQ: %d.\n", rc);
736 sdp_sk(ssk->socket)->rx_ring.cq = rx_cq;
742 kfree(ssk->rx_ring.buffer);
743 ssk->rx_ring.buffer = NULL;
748 sdp_rx_ring_destroy(struct sdp_sock *ssk)
751 cancel_work_sync(&ssk->rx_comp_work);
752 rx_ring_destroy_lock(&ssk->rx_ring);
754 if (ssk->rx_ring.buffer) {
755 sdp_rx_ring_purge(ssk);
757 kfree(ssk->rx_ring.buffer);
758 ssk->rx_ring.buffer = NULL;
761 if (ssk->rx_ring.cq) {
762 if (ib_destroy_cq(ssk->rx_ring.cq)) {
763 sdp_warn(ssk->socket, "destroy cq(%p) failed\n",
766 ssk->rx_ring.cq = NULL;
770 WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));