]> CyberLeo.Net >> Repos - FreeBSD/releng/9.2.git/blob - sys/ofed/drivers/infiniband/ulp/sdp/sdp_tx.c
- Copy stable/9 to releng/9.2 as part of the 9.2-RELEASE cycle.
[FreeBSD/releng/9.2.git] / sys / ofed / drivers / infiniband / ulp / sdp / sdp_tx.c
1 /*
2  * Copyright (c) 2009 Mellanox Technologies Ltd.  All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  */
32 #include "sdp.h"
33
34 #define sdp_cnt(var) do { (var)++; } while (0)
35
36 SDP_MODPARAM_SINT(sdp_keepalive_probes_sent, 0,
37                 "Total number of keepalive probes sent.");
38
39 static int sdp_process_tx_cq(struct sdp_sock *ssk);
40 static void sdp_poll_tx_timeout(void *data);
41
42 int
43 sdp_xmit_poll(struct sdp_sock *ssk, int force)
44 {
45         int wc_processed = 0;
46
47         SDP_WLOCK_ASSERT(ssk);
48         sdp_prf(ssk->socket, NULL, "%s", __func__);
49
50         /* If we don't have a pending timer, set one up to catch our recent
51            post in case the interface becomes idle */
52         if (!callout_pending(&ssk->tx_ring.timer))
53                 callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT,
54                     sdp_poll_tx_timeout, ssk);
55
56         /* Poll the CQ every SDP_TX_POLL_MODER packets */
57         if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
58                 wc_processed = sdp_process_tx_cq(ssk);
59
60         return wc_processed;
61 }
62
63 void
64 sdp_post_send(struct sdp_sock *ssk, struct mbuf *mb)
65 {
66         struct sdp_buf *tx_req;
67         struct sdp_bsdh *h;
68         unsigned long mseq;
69         struct ib_device *dev;
70         struct ib_send_wr *bad_wr;
71         struct ib_sge ibsge[SDP_MAX_SEND_SGES];
72         struct ib_sge *sge;
73         struct ib_send_wr tx_wr = { NULL };
74         int i, rc;
75         u64 addr;
76
77         SDPSTATS_COUNTER_MID_INC(post_send, h->mid);
78         SDPSTATS_HIST(send_size, mb->len);
79
80         if (!ssk->qp_active) {
81                 m_freem(mb);
82                 return;
83         }
84
85         mseq = ring_head(ssk->tx_ring);
86         h = mtod(mb, struct sdp_bsdh *);
87         ssk->tx_packets++;
88         ssk->tx_bytes += mb->m_pkthdr.len;
89
90 #ifdef SDP_ZCOPY
91         if (unlikely(h->mid == SDP_MID_SRCAVAIL)) {
92                 struct tx_srcavail_state *tx_sa = TX_SRCAVAIL_STATE(mb);
93                 if (ssk->tx_sa != tx_sa) {
94                         sdp_dbg_data(ssk->socket, "SrcAvail cancelled "
95                                         "before being sent!\n");
96                         WARN_ON(1);
97                         m_freem(mb);
98                         return;
99                 }
100                 TX_SRCAVAIL_STATE(mb)->mseq = mseq;
101         }
102 #endif
103
104         if (unlikely(mb->m_flags & M_URG))
105                 h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
106         else
107                 h->flags = 0;
108
109         mb->m_flags |= M_RDONLY; /* Don't allow compression once sent. */
110         h->bufs = htons(rx_ring_posted(ssk));
111         h->len = htonl(mb->m_pkthdr.len);
112         h->mseq = htonl(mseq);
113         h->mseq_ack = htonl(mseq_ack(ssk));
114
115         sdp_prf1(ssk->socket, mb, "TX: %s bufs: %d mseq:%ld ack:%d",
116                         mid2str(h->mid), rx_ring_posted(ssk), mseq,
117                         ntohl(h->mseq_ack));
118
119         SDP_DUMP_PACKET(ssk->socket, "TX", mb, h);
120
121         tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)];
122         tx_req->mb = mb;
123         dev = ssk->ib_device;
124         sge = &ibsge[0];
125         for (i = 0;  mb != NULL; i++, mb = mb->m_next, sge++) {
126                 addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
127                     DMA_TO_DEVICE);
128                 /* TODO: proper error handling */
129                 BUG_ON(ib_dma_mapping_error(dev, addr));
130                 BUG_ON(i >= SDP_MAX_SEND_SGES);
131                 tx_req->mapping[i] = addr;
132                 sge->addr = addr;
133                 sge->length = mb->m_len;
134                 sge->lkey = ssk->sdp_dev->mr->lkey;
135         }
136         tx_wr.next = NULL;
137         tx_wr.wr_id = mseq | SDP_OP_SEND;
138         tx_wr.sg_list = ibsge;
139         tx_wr.num_sge = i;
140         tx_wr.opcode = IB_WR_SEND;
141         tx_wr.send_flags = IB_SEND_SIGNALED;
142         if (unlikely(tx_req->mb->m_flags & M_URG))
143                 tx_wr.send_flags |= IB_SEND_SOLICITED;
144
145         rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr);
146         if (unlikely(rc)) {
147                 sdp_dbg(ssk->socket,
148                                 "ib_post_send failed with status %d.\n", rc);
149
150                 sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE);
151
152                 sdp_notify(ssk, ECONNRESET);
153                 m_freem(tx_req->mb);
154                 return;
155         }
156
157         atomic_inc(&ssk->tx_ring.head);
158         atomic_dec(&ssk->tx_ring.credits);
159         atomic_set(&ssk->remote_credits, rx_ring_posted(ssk));
160
161         return;
162 }
163
164 static struct mbuf *
165 sdp_send_completion(struct sdp_sock *ssk, int mseq)
166 {
167         struct ib_device *dev;
168         struct sdp_buf *tx_req;
169         struct mbuf *mb = NULL;
170         struct sdp_tx_ring *tx_ring = &ssk->tx_ring;
171
172         if (unlikely(mseq != ring_tail(*tx_ring))) {
173                 printk(KERN_WARNING "Bogus send completion id %d tail %d\n",
174                         mseq, ring_tail(*tx_ring));
175                 goto out;
176         }
177
178         dev = ssk->ib_device;
179         tx_req = &tx_ring->buffer[mseq & (SDP_TX_SIZE - 1)];
180         mb = tx_req->mb;
181         sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE);
182
183 #ifdef SDP_ZCOPY
184         /* TODO: AIO and real zcopy code; add their context support here */
185         if (BZCOPY_STATE(mb))
186                 BZCOPY_STATE(mb)->busy--;
187 #endif
188
189         atomic_inc(&tx_ring->tail);
190
191 out:
192         return mb;
193 }
194
195 static int
196 sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
197 {
198         struct mbuf *mb = NULL;
199         struct sdp_bsdh *h;
200
201         if (unlikely(wc->status)) {
202                 if (wc->status != IB_WC_WR_FLUSH_ERR) {
203                         sdp_prf(ssk->socket, mb, "Send completion with error. "
204                                 "Status %d", wc->status);
205                         sdp_dbg_data(ssk->socket, "Send completion with error. "
206                                 "Status %d\n", wc->status);
207                         sdp_notify(ssk, ECONNRESET);
208                 }
209         }
210
211         mb = sdp_send_completion(ssk, wc->wr_id);
212         if (unlikely(!mb))
213                 return -1;
214
215         h = mtod(mb, struct sdp_bsdh *);
216         sdp_prf1(ssk->socket, mb, "tx completion. mseq:%d", ntohl(h->mseq));
217         sdp_dbg(ssk->socket, "tx completion. %p %d mseq:%d",
218             mb, mb->m_pkthdr.len, ntohl(h->mseq));
219         m_freem(mb);
220
221         return 0;
222 }
223
224 static inline void
225 sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
226 {
227
228         if (likely(wc->wr_id & SDP_OP_SEND)) {
229                 sdp_handle_send_comp(ssk, wc);
230                 return;
231         }
232
233 #ifdef SDP_ZCOPY
234         if (wc->wr_id & SDP_OP_RDMA) {
235                 /* TODO: handle failed RDMA read cqe */
236
237                 sdp_dbg_data(ssk->socket,
238                     "TX comp: RDMA read. status: %d\n", wc->status);
239                 sdp_prf1(sk, NULL, "TX comp: RDMA read");
240
241                 if (!ssk->tx_ring.rdma_inflight) {
242                         sdp_warn(ssk->socket, "ERROR: unexpected RDMA read\n");
243                         return;
244                 }
245
246                 if (!ssk->tx_ring.rdma_inflight->busy) {
247                         sdp_warn(ssk->socket,
248                             "ERROR: too many RDMA read completions\n");
249                         return;
250                 }
251
252                 /* Only last RDMA read WR is signalled. Order is guaranteed -
253                  * therefore if Last RDMA read WR is completed - all other
254                  * have, too */
255                 ssk->tx_ring.rdma_inflight->busy = 0;
256                 sowwakeup(ssk->socket);
257                 sdp_dbg_data(ssk->socket, "woke up sleepers\n");
258                 return;
259         }
260 #endif
261
262         /* Keepalive probe sent cleanup */
263         sdp_cnt(sdp_keepalive_probes_sent);
264
265         if (likely(!wc->status))
266                 return;
267
268         sdp_dbg(ssk->socket, " %s consumes KEEPALIVE status %d\n",
269                         __func__, wc->status);
270
271         if (wc->status == IB_WC_WR_FLUSH_ERR)
272                 return;
273
274         sdp_notify(ssk, ECONNRESET);
275 }
276
277 static int
278 sdp_process_tx_cq(struct sdp_sock *ssk)
279 {
280         struct ib_wc ibwc[SDP_NUM_WC];
281         int n, i;
282         int wc_processed = 0;
283
284         SDP_WLOCK_ASSERT(ssk);
285
286         if (!ssk->tx_ring.cq) {
287                 sdp_dbg(ssk->socket, "tx irq on destroyed tx_cq\n");
288                 return 0;
289         }
290
291         do {
292                 n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc);
293                 for (i = 0; i < n; ++i) {
294                         sdp_process_tx_wc(ssk, ibwc + i);
295                         wc_processed++;
296                 }
297         } while (n == SDP_NUM_WC);
298
299         if (wc_processed) {
300                 sdp_post_sends(ssk, M_DONTWAIT);
301                 sdp_prf1(sk, NULL, "Waking sendmsg. inflight=%d", 
302                                 (u32) tx_ring_posted(ssk));
303                 sowwakeup(ssk->socket);
304         }
305
306         return wc_processed;
307 }
308
309 static void
310 sdp_poll_tx(struct sdp_sock *ssk)
311 {
312         struct socket *sk = ssk->socket;
313         u32 inflight, wc_processed;
314
315         sdp_prf1(ssk->socket, NULL, "TX timeout: inflight=%d, head=%d tail=%d", 
316                 (u32) tx_ring_posted(ssk),
317                 ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring));
318
319         if (unlikely(ssk->state == TCPS_CLOSED)) {
320                 sdp_warn(sk, "Socket is closed\n");
321                 goto out;
322         }
323
324         wc_processed = sdp_process_tx_cq(ssk);
325         if (!wc_processed)
326                 SDPSTATS_COUNTER_INC(tx_poll_miss);
327         else
328                 SDPSTATS_COUNTER_INC(tx_poll_hit);
329
330         inflight = (u32) tx_ring_posted(ssk);
331         sdp_prf1(ssk->socket, NULL, "finished tx proccessing. inflight = %d",
332             inflight);
333
334         /* If there are still packets in flight and the timer has not already
335          * been scheduled by the Tx routine then schedule it here to guarantee
336          * completion processing of these packets */
337         if (inflight)
338                 callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT,
339                     sdp_poll_tx_timeout, ssk);
340 out:
341 #ifdef SDP_ZCOPY
342         if (ssk->tx_ring.rdma_inflight && ssk->tx_ring.rdma_inflight->busy) {
343                 sdp_prf1(sk, NULL, "RDMA is inflight - arming irq");
344                 sdp_arm_tx_cq(ssk);
345         }
346 #endif
347         return;
348 }
349
350 static void
351 sdp_poll_tx_timeout(void *data)
352 {
353         struct sdp_sock *ssk = (struct sdp_sock *)data;
354
355         if (!callout_active(&ssk->tx_ring.timer))
356                 return;
357         callout_deactivate(&ssk->tx_ring.timer);
358         sdp_poll_tx(ssk);
359 }
360
361 static void
362 sdp_tx_irq(struct ib_cq *cq, void *cq_context)
363 {
364         struct sdp_sock *ssk;
365
366         ssk = cq_context;
367         sdp_prf1(ssk->socket, NULL, "tx irq");
368         sdp_dbg_data(ssk->socket, "Got tx comp interrupt\n");
369         SDPSTATS_COUNTER_INC(tx_int_count);
370         SDP_WLOCK(ssk);
371         sdp_poll_tx(ssk);
372         SDP_WUNLOCK(ssk);
373 }
374
375 static
376 void sdp_tx_ring_purge(struct sdp_sock *ssk)
377 {
378         while (tx_ring_posted(ssk)) {
379                 struct mbuf *mb;
380                 mb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring));
381                 if (!mb)
382                         break;
383                 m_freem(mb);
384         }
385 }
386
387 void
388 sdp_post_keepalive(struct sdp_sock *ssk)
389 {
390         int rc;
391         struct ib_send_wr wr, *bad_wr;
392
393         sdp_dbg(ssk->socket, "%s\n", __func__);
394
395         memset(&wr, 0, sizeof(wr));
396
397         wr.next    = NULL;
398         wr.wr_id   = 0;
399         wr.sg_list = NULL;
400         wr.num_sge = 0;
401         wr.opcode  = IB_WR_RDMA_WRITE;
402
403         rc = ib_post_send(ssk->qp, &wr, &bad_wr);
404         if (rc) {
405                 sdp_dbg(ssk->socket,
406                         "ib_post_keepalive failed with status %d.\n", rc);
407                 sdp_notify(ssk, ECONNRESET);
408         }
409
410         sdp_cnt(sdp_keepalive_probes_sent);
411 }
412
413 static void
414 sdp_tx_cq_event_handler(struct ib_event *event, void *data)
415 {
416 }
417
418 int
419 sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
420 {
421         struct ib_cq *tx_cq;
422         int rc = 0;
423
424         sdp_dbg(ssk->socket, "tx ring create\n");
425         callout_init_rw(&ssk->tx_ring.timer, &ssk->lock, 0);
426         callout_init_rw(&ssk->nagle_timer, &ssk->lock, 0);
427         atomic_set(&ssk->tx_ring.head, 1);
428         atomic_set(&ssk->tx_ring.tail, 1);
429
430         ssk->tx_ring.buffer = kzalloc(
431                         sizeof *ssk->tx_ring.buffer * SDP_TX_SIZE, GFP_KERNEL);
432         if (!ssk->tx_ring.buffer) {
433                 rc = -ENOMEM;
434                 sdp_warn(ssk->socket, "Can't allocate TX Ring size %zd.\n",
435                          sizeof(*ssk->tx_ring.buffer) * SDP_TX_SIZE);
436
437                 goto out;
438         }
439
440         tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_tx_cq_event_handler,
441                           ssk, SDP_TX_SIZE, IB_CQ_VECTOR_LEAST_ATTACHED);
442
443         if (IS_ERR(tx_cq)) {
444                 rc = PTR_ERR(tx_cq);
445                 sdp_warn(ssk->socket, "Unable to allocate TX CQ: %d.\n", rc);
446                 goto err_cq;
447         }
448         ssk->tx_ring.cq = tx_cq;
449         ssk->tx_ring.poll_cnt = 0;
450         sdp_arm_tx_cq(ssk);
451
452         return 0;
453
454 err_cq:
455         kfree(ssk->tx_ring.buffer);
456         ssk->tx_ring.buffer = NULL;
457 out:
458         return rc;
459 }
460
461 void
462 sdp_tx_ring_destroy(struct sdp_sock *ssk)
463 {
464
465         sdp_dbg(ssk->socket, "tx ring destroy\n");
466         SDP_WLOCK(ssk);
467         callout_stop(&ssk->tx_ring.timer);
468         callout_stop(&ssk->nagle_timer);
469         SDP_WUNLOCK(ssk);
470         callout_drain(&ssk->tx_ring.timer);
471         callout_drain(&ssk->nagle_timer);
472
473         if (ssk->tx_ring.buffer) {
474                 sdp_tx_ring_purge(ssk);
475
476                 kfree(ssk->tx_ring.buffer);
477                 ssk->tx_ring.buffer = NULL;
478         }
479
480         if (ssk->tx_ring.cq) {
481                 if (ib_destroy_cq(ssk->tx_ring.cq)) {
482                         sdp_warn(ssk->socket, "destroy cq(%p) failed\n",
483                                         ssk->tx_ring.cq);
484                 } else {
485                         ssk->tx_ring.cq = NULL;
486                 }
487         }
488
489         WARN_ON(ring_head(ssk->tx_ring) != ring_tail(ssk->tx_ring));
490 }