2 * Copyright (c) 2009 Mellanox Technologies Ltd. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
34 #define sdp_cnt(var) do { (var)++; } while (0)
36 SDP_MODPARAM_SINT(sdp_keepalive_probes_sent, 0,
37 "Total number of keepalive probes sent.");
39 static int sdp_process_tx_cq(struct sdp_sock *ssk);
40 static void sdp_poll_tx_timeout(void *data);
43 sdp_xmit_poll(struct sdp_sock *ssk, int force)
47 SDP_WLOCK_ASSERT(ssk);
48 sdp_prf(ssk->socket, NULL, "%s", __func__);
50 /* If we don't have a pending timer, set one up to catch our recent
51 post in case the interface becomes idle */
52 if (!callout_pending(&ssk->tx_ring.timer))
53 callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT,
54 sdp_poll_tx_timeout, ssk);
56 /* Poll the CQ every SDP_TX_POLL_MODER packets */
57 if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
58 wc_processed = sdp_process_tx_cq(ssk);
64 sdp_post_send(struct sdp_sock *ssk, struct mbuf *mb)
66 struct sdp_buf *tx_req;
69 struct ib_device *dev;
70 struct ib_send_wr *bad_wr;
71 struct ib_sge ibsge[SDP_MAX_SEND_SGES];
73 struct ib_send_wr tx_wr = { NULL };
77 SDPSTATS_COUNTER_MID_INC(post_send, h->mid);
78 SDPSTATS_HIST(send_size, mb->len);
80 if (!ssk->qp_active) {
85 mseq = ring_head(ssk->tx_ring);
86 h = mtod(mb, struct sdp_bsdh *);
88 ssk->tx_bytes += mb->m_pkthdr.len;
91 if (unlikely(h->mid == SDP_MID_SRCAVAIL)) {
92 struct tx_srcavail_state *tx_sa = TX_SRCAVAIL_STATE(mb);
93 if (ssk->tx_sa != tx_sa) {
94 sdp_dbg_data(ssk->socket, "SrcAvail cancelled "
95 "before being sent!\n");
100 TX_SRCAVAIL_STATE(mb)->mseq = mseq;
104 if (unlikely(mb->m_flags & M_URG))
105 h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
109 mb->m_flags |= M_RDONLY; /* Don't allow compression once sent. */
110 h->bufs = htons(rx_ring_posted(ssk));
111 h->len = htonl(mb->m_pkthdr.len);
112 h->mseq = htonl(mseq);
113 h->mseq_ack = htonl(mseq_ack(ssk));
115 sdp_prf1(ssk->socket, mb, "TX: %s bufs: %d mseq:%ld ack:%d",
116 mid2str(h->mid), rx_ring_posted(ssk), mseq,
119 SDP_DUMP_PACKET(ssk->socket, "TX", mb, h);
121 tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)];
123 dev = ssk->ib_device;
125 for (i = 0; mb != NULL; i++, mb = mb->m_next, sge++) {
126 addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
128 /* TODO: proper error handling */
129 BUG_ON(ib_dma_mapping_error(dev, addr));
130 BUG_ON(i >= SDP_MAX_SEND_SGES);
131 tx_req->mapping[i] = addr;
133 sge->length = mb->m_len;
134 sge->lkey = ssk->sdp_dev->mr->lkey;
137 tx_wr.wr_id = mseq | SDP_OP_SEND;
138 tx_wr.sg_list = ibsge;
140 tx_wr.opcode = IB_WR_SEND;
141 tx_wr.send_flags = IB_SEND_SIGNALED;
142 if (unlikely(tx_req->mb->m_flags & M_URG))
143 tx_wr.send_flags |= IB_SEND_SOLICITED;
145 rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr);
148 "ib_post_send failed with status %d.\n", rc);
150 sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE);
152 sdp_notify(ssk, ECONNRESET);
157 atomic_inc(&ssk->tx_ring.head);
158 atomic_dec(&ssk->tx_ring.credits);
159 atomic_set(&ssk->remote_credits, rx_ring_posted(ssk));
165 sdp_send_completion(struct sdp_sock *ssk, int mseq)
167 struct ib_device *dev;
168 struct sdp_buf *tx_req;
169 struct mbuf *mb = NULL;
170 struct sdp_tx_ring *tx_ring = &ssk->tx_ring;
172 if (unlikely(mseq != ring_tail(*tx_ring))) {
173 printk(KERN_WARNING "Bogus send completion id %d tail %d\n",
174 mseq, ring_tail(*tx_ring));
178 dev = ssk->ib_device;
179 tx_req = &tx_ring->buffer[mseq & (SDP_TX_SIZE - 1)];
181 sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE);
184 /* TODO: AIO and real zcopy code; add their context support here */
185 if (BZCOPY_STATE(mb))
186 BZCOPY_STATE(mb)->busy--;
189 atomic_inc(&tx_ring->tail);
196 sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
198 struct mbuf *mb = NULL;
201 if (unlikely(wc->status)) {
202 if (wc->status != IB_WC_WR_FLUSH_ERR) {
203 sdp_prf(ssk->socket, mb, "Send completion with error. "
204 "Status %d", wc->status);
205 sdp_dbg_data(ssk->socket, "Send completion with error. "
206 "Status %d\n", wc->status);
207 sdp_notify(ssk, ECONNRESET);
211 mb = sdp_send_completion(ssk, wc->wr_id);
215 h = mtod(mb, struct sdp_bsdh *);
216 sdp_prf1(ssk->socket, mb, "tx completion. mseq:%d", ntohl(h->mseq));
217 sdp_dbg(ssk->socket, "tx completion. %p %d mseq:%d",
218 mb, mb->m_pkthdr.len, ntohl(h->mseq));
225 sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
228 if (likely(wc->wr_id & SDP_OP_SEND)) {
229 sdp_handle_send_comp(ssk, wc);
234 if (wc->wr_id & SDP_OP_RDMA) {
235 /* TODO: handle failed RDMA read cqe */
237 sdp_dbg_data(ssk->socket,
238 "TX comp: RDMA read. status: %d\n", wc->status);
239 sdp_prf1(sk, NULL, "TX comp: RDMA read");
241 if (!ssk->tx_ring.rdma_inflight) {
242 sdp_warn(ssk->socket, "ERROR: unexpected RDMA read\n");
246 if (!ssk->tx_ring.rdma_inflight->busy) {
247 sdp_warn(ssk->socket,
248 "ERROR: too many RDMA read completions\n");
252 /* Only last RDMA read WR is signalled. Order is guaranteed -
253 * therefore if Last RDMA read WR is completed - all other
255 ssk->tx_ring.rdma_inflight->busy = 0;
256 sowwakeup(ssk->socket);
257 sdp_dbg_data(ssk->socket, "woke up sleepers\n");
262 /* Keepalive probe sent cleanup */
263 sdp_cnt(sdp_keepalive_probes_sent);
265 if (likely(!wc->status))
268 sdp_dbg(ssk->socket, " %s consumes KEEPALIVE status %d\n",
269 __func__, wc->status);
271 if (wc->status == IB_WC_WR_FLUSH_ERR)
274 sdp_notify(ssk, ECONNRESET);
278 sdp_process_tx_cq(struct sdp_sock *ssk)
280 struct ib_wc ibwc[SDP_NUM_WC];
282 int wc_processed = 0;
284 SDP_WLOCK_ASSERT(ssk);
286 if (!ssk->tx_ring.cq) {
287 sdp_dbg(ssk->socket, "tx irq on destroyed tx_cq\n");
292 n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc);
293 for (i = 0; i < n; ++i) {
294 sdp_process_tx_wc(ssk, ibwc + i);
297 } while (n == SDP_NUM_WC);
300 sdp_post_sends(ssk, M_DONTWAIT);
301 sdp_prf1(sk, NULL, "Waking sendmsg. inflight=%d",
302 (u32) tx_ring_posted(ssk));
303 sowwakeup(ssk->socket);
310 sdp_poll_tx(struct sdp_sock *ssk)
312 struct socket *sk = ssk->socket;
313 u32 inflight, wc_processed;
315 sdp_prf1(ssk->socket, NULL, "TX timeout: inflight=%d, head=%d tail=%d",
316 (u32) tx_ring_posted(ssk),
317 ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring));
319 if (unlikely(ssk->state == TCPS_CLOSED)) {
320 sdp_warn(sk, "Socket is closed\n");
324 wc_processed = sdp_process_tx_cq(ssk);
326 SDPSTATS_COUNTER_INC(tx_poll_miss);
328 SDPSTATS_COUNTER_INC(tx_poll_hit);
330 inflight = (u32) tx_ring_posted(ssk);
331 sdp_prf1(ssk->socket, NULL, "finished tx proccessing. inflight = %d",
334 /* If there are still packets in flight and the timer has not already
335 * been scheduled by the Tx routine then schedule it here to guarantee
336 * completion processing of these packets */
338 callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT,
339 sdp_poll_tx_timeout, ssk);
342 if (ssk->tx_ring.rdma_inflight && ssk->tx_ring.rdma_inflight->busy) {
343 sdp_prf1(sk, NULL, "RDMA is inflight - arming irq");
351 sdp_poll_tx_timeout(void *data)
353 struct sdp_sock *ssk = (struct sdp_sock *)data;
355 if (!callout_active(&ssk->tx_ring.timer))
357 callout_deactivate(&ssk->tx_ring.timer);
362 sdp_tx_irq(struct ib_cq *cq, void *cq_context)
364 struct sdp_sock *ssk;
367 sdp_prf1(ssk->socket, NULL, "tx irq");
368 sdp_dbg_data(ssk->socket, "Got tx comp interrupt\n");
369 SDPSTATS_COUNTER_INC(tx_int_count);
376 void sdp_tx_ring_purge(struct sdp_sock *ssk)
378 while (tx_ring_posted(ssk)) {
380 mb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring));
388 sdp_post_keepalive(struct sdp_sock *ssk)
391 struct ib_send_wr wr, *bad_wr;
393 sdp_dbg(ssk->socket, "%s\n", __func__);
395 memset(&wr, 0, sizeof(wr));
401 wr.opcode = IB_WR_RDMA_WRITE;
403 rc = ib_post_send(ssk->qp, &wr, &bad_wr);
406 "ib_post_keepalive failed with status %d.\n", rc);
407 sdp_notify(ssk, ECONNRESET);
410 sdp_cnt(sdp_keepalive_probes_sent);
414 sdp_tx_cq_event_handler(struct ib_event *event, void *data)
419 sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
424 sdp_dbg(ssk->socket, "tx ring create\n");
425 callout_init_rw(&ssk->tx_ring.timer, &ssk->lock, 0);
426 callout_init_rw(&ssk->nagle_timer, &ssk->lock, 0);
427 atomic_set(&ssk->tx_ring.head, 1);
428 atomic_set(&ssk->tx_ring.tail, 1);
430 ssk->tx_ring.buffer = kzalloc(
431 sizeof *ssk->tx_ring.buffer * SDP_TX_SIZE, GFP_KERNEL);
432 if (!ssk->tx_ring.buffer) {
434 sdp_warn(ssk->socket, "Can't allocate TX Ring size %zd.\n",
435 sizeof(*ssk->tx_ring.buffer) * SDP_TX_SIZE);
440 tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_tx_cq_event_handler,
441 ssk, SDP_TX_SIZE, IB_CQ_VECTOR_LEAST_ATTACHED);
445 sdp_warn(ssk->socket, "Unable to allocate TX CQ: %d.\n", rc);
448 ssk->tx_ring.cq = tx_cq;
449 ssk->tx_ring.poll_cnt = 0;
455 kfree(ssk->tx_ring.buffer);
456 ssk->tx_ring.buffer = NULL;
462 sdp_tx_ring_destroy(struct sdp_sock *ssk)
465 sdp_dbg(ssk->socket, "tx ring destroy\n");
467 callout_stop(&ssk->tx_ring.timer);
468 callout_stop(&ssk->nagle_timer);
470 callout_drain(&ssk->tx_ring.timer);
471 callout_drain(&ssk->nagle_timer);
473 if (ssk->tx_ring.buffer) {
474 sdp_tx_ring_purge(ssk);
476 kfree(ssk->tx_ring.buffer);
477 ssk->tx_ring.buffer = NULL;
480 if (ssk->tx_ring.cq) {
481 if (ib_destroy_cq(ssk->tx_ring.cq)) {
482 sdp_warn(ssk->socket, "destroy cq(%p) failed\n",
485 ssk->tx_ring.cq = NULL;
489 WARN_ON(ring_head(ssk->tx_ring) != ring_tail(ssk->tx_ring));