]> CyberLeo.Net >> Repos - FreeBSD/releng/10.0.git/blob - sys/ofed/drivers/infiniband/ulp/sdp/sdp_zcopy.c
- Copy stable/10 (r259064) to releng/10.0 as part of the
[FreeBSD/releng/10.0.git] / sys / ofed / drivers / infiniband / ulp / sdp / sdp_zcopy.c
1 /*
2  * Copyright (c) 2006 Mellanox Technologies Ltd.  All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  */
32 #include <linux/tcp.h>
33 #include <asm/ioctls.h>
34 #include <linux/workqueue.h>
35 #include <linux/net.h>
36 #include <linux/socket.h>
37 #include <net/protocol.h>
38 #include <net/inet_common.h>
39 #include <rdma/rdma_cm.h>
40 #include <rdma/ib_verbs.h>
41 #include <rdma/ib_fmr_pool.h>
42 #include <rdma/ib_umem.h> 
43 #include <net/tcp.h> /* for memcpy_toiovec */
44 #include <asm/io.h>
45 #include <asm/uaccess.h>
46 #include <linux/delay.h>
47 #include "sdp.h"
48
49 static int sdp_post_srcavail(struct socket *sk, struct tx_srcavail_state *tx_sa)
50 {
51         struct sdp_sock *ssk = sdp_sk(sk);
52         struct mbuf *mb;
53         int payload_len;
54         struct page *payload_pg;
55         int off, len;
56         struct ib_umem_chunk *chunk;
57
58         WARN_ON(ssk->tx_sa);
59
60         BUG_ON(!tx_sa);
61         BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey);
62         BUG_ON(!tx_sa->umem);
63         BUG_ON(!tx_sa->umem->chunk_list.next);
64
65         chunk = list_entry(tx_sa->umem->chunk_list.next, struct ib_umem_chunk, list);
66         BUG_ON(!chunk->nmap);
67
68         off = tx_sa->umem->offset;
69         len = tx_sa->umem->length;
70
71         tx_sa->bytes_sent = tx_sa->bytes_acked = 0;
72
73         mb = sdp_alloc_mb_srcavail(sk, len, tx_sa->fmr->fmr->lkey, off, 0);
74         if (!mb) {
75                 return -ENOMEM;
76         }
77         sdp_dbg_data(sk, "sending SrcAvail\n");
78                 
79         TX_SRCAVAIL_STATE(mb) = tx_sa; /* tx_sa is hanged on the mb 
80                                          * but continue to live after mb is freed */
81         ssk->tx_sa = tx_sa;
82
83         /* must have payload inlined in SrcAvail packet in combined mode */
84         payload_len = MIN(tx_sa->umem->page_size - off, len);
85         payload_len = MIN(payload_len, ssk->xmit_size_goal - sizeof(struct sdp_srcah));
86         payload_pg  = sg_page(&chunk->page_list[0]);
87         get_page(payload_pg);
88
89         sdp_dbg_data(sk, "payload: off: 0x%x, pg: %p, len: 0x%x\n",
90                 off, payload_pg, payload_len);
91
92         mb_fill_page_desc(mb, mb_shinfo(mb)->nr_frags,
93                         payload_pg, off, payload_len);
94
95         mb->len             += payload_len;
96         mb->data_len         = payload_len;
97         mb->truesize        += payload_len;
98 //      sk->sk_wmem_queued   += payload_len;
99 //      sk->sk_forward_alloc -= payload_len;
100
101         mb_entail(sk, ssk, mb);
102         
103         ssk->write_seq += payload_len;
104         SDP_SKB_CB(mb)->end_seq += payload_len;
105
106         tx_sa->bytes_sent = tx_sa->umem->length;
107         tx_sa->bytes_acked = payload_len;
108
109         /* TODO: pushing the mb into the tx_queue should be enough */
110
111         return 0;
112 }
113
114 static int sdp_post_srcavail_cancel(struct socket *sk)
115 {
116         struct sdp_sock *ssk = sdp_sk(sk);
117         struct mbuf *mb;
118
119         sdp_dbg_data(ssk->socket, "Posting srcavail cancel\n");
120
121         mb = sdp_alloc_mb_srcavail_cancel(sk, 0);
122         mb_entail(sk, ssk, mb);
123
124         sdp_post_sends(ssk, 0);
125
126         schedule_delayed_work(&ssk->srcavail_cancel_work,
127                         SDP_SRCAVAIL_CANCEL_TIMEOUT);
128
129         return 0;
130 }
131
132 void srcavail_cancel_timeout(struct work_struct *work)
133 {
134         struct sdp_sock *ssk =
135                 container_of(work, struct sdp_sock, srcavail_cancel_work.work);
136         struct socket *sk = ssk->socket;
137
138         lock_sock(sk);
139
140         sdp_dbg_data(sk, "both SrcAvail and SrcAvailCancel timedout."
141                         " closing connection\n");
142         sdp_set_error(sk, -ECONNRESET);
143         wake_up(&ssk->wq);
144
145         release_sock(sk);
146 }
147
148 static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p,
149                 int ignore_signals)
150 {
151         struct socket *sk = ssk->socket;
152         int err = 0;
153         long vm_wait = 0;
154         long current_timeo = *timeo_p;
155         struct tx_srcavail_state *tx_sa = ssk->tx_sa;
156         DEFINE_WAIT(wait);
157
158         sdp_dbg_data(sk, "sleep till RdmaRdCompl. timeo = %ld.\n", *timeo_p);
159         sdp_prf1(sk, NULL, "Going to sleep");
160         while (ssk->qp_active) {
161                 prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
162
163                 if (unlikely(!*timeo_p)) {
164                         err = -ETIME;
165                         tx_sa->abort_flags |= TX_SA_TIMEDOUT;
166                         sdp_prf1(sk, NULL, "timeout");
167                         SDPSTATS_COUNTER_INC(zcopy_tx_timeout);
168                         break;
169                 }
170
171                 else if (tx_sa->bytes_acked > tx_sa->bytes_sent) {
172                         err = -EINVAL;
173                         sdp_dbg_data(sk, "acked bytes > sent bytes\n");
174                         tx_sa->abort_flags |= TX_SA_ERROR;
175                         break;
176                 }
177
178                 if (tx_sa->abort_flags & TX_SA_SENDSM) {
179                         sdp_prf1(sk, NULL, "Aborting SrcAvail sending");
180                         SDPSTATS_COUNTER_INC(zcopy_tx_aborted);
181                         err = -EAGAIN;
182                         break ;
183                 }
184
185                 if (!ignore_signals) {
186                         if (signal_pending(current)) {
187                                 err = -EINTR;
188                                 sdp_prf1(sk, NULL, "signalled");
189                                 tx_sa->abort_flags |= TX_SA_INTRRUPTED;
190                                 break;
191                         }
192
193                         if (ssk->rx_sa && (tx_sa->bytes_acked < tx_sa->bytes_sent)) {
194                                 sdp_dbg_data(sk, "Crossing SrcAvail - aborting this\n");
195                                 tx_sa->abort_flags |= TX_SA_CROSS_SEND;
196                                 SDPSTATS_COUNTER_INC(zcopy_cross_send);
197                                 err = -ETIME;
198                                 break ;
199                         }
200                 }
201
202                 posts_handler_put(ssk);
203
204                 sk_wait_event(sk, &current_timeo,
205                                 tx_sa->abort_flags &&
206                                 ssk->rx_sa &&
207                                 (tx_sa->bytes_acked < tx_sa->bytes_sent) && 
208                                 vm_wait);
209                 sdp_dbg_data(ssk->socket, "woke up sleepers\n");
210
211                 posts_handler_get(ssk);
212
213                 if (tx_sa->bytes_acked == tx_sa->bytes_sent)
214                         break;
215
216                 if (vm_wait) {
217                         vm_wait -= current_timeo;
218                         current_timeo = *timeo_p;
219                         if (current_timeo != MAX_SCHEDULE_TIMEOUT &&
220                             (current_timeo -= vm_wait) < 0)
221                                 current_timeo = 0;
222                         vm_wait = 0;
223                 }
224                 *timeo_p = current_timeo;
225         }
226
227         finish_wait(sk->sk_sleep, &wait);
228
229         sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, flags: 0x%x\n",
230                         tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort_flags);
231
232         if (!ssk->qp_active) {
233                 sdp_dbg(sk, "QP destroyed while waiting\n");
234                 return -EINVAL;
235         }
236         return err;
237 }
238
239 static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk)
240 {
241         struct socket *sk = ssk->socket;
242         long timeo = HZ * 5; /* Timeout for for RDMA read */
243         DEFINE_WAIT(wait);
244
245         sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n");
246         while (1) {
247                 prepare_to_wait(sk->sk_sleep, &wait, TASK_UNINTERRUPTIBLE);
248
249                 if (!ssk->tx_ring.rdma_inflight->busy) {
250                         sdp_dbg_data(sk, "got rdma cqe\n");
251                         break;
252                 }
253
254                 if (!ssk->qp_active) {
255                         sdp_dbg_data(sk, "QP destroyed\n");
256                         break;
257                 }
258
259                 if (!timeo) {
260                         sdp_warn(sk, "Panic: Timed out waiting for RDMA read\n");
261                         WARN_ON(1);
262                         break;
263                 }
264
265                 posts_handler_put(ssk);
266
267                 sdp_prf1(sk, NULL, "Going to sleep");
268                 sk_wait_event(sk, &timeo, 
269                         !ssk->tx_ring.rdma_inflight->busy);
270                 sdp_prf1(sk, NULL, "Woke up");
271                 sdp_dbg_data(ssk->socket, "woke up sleepers\n");
272
273                 posts_handler_get(ssk);
274         }
275
276         finish_wait(sk->sk_sleep, &wait);
277
278         sdp_dbg_data(sk, "Finished waiting\n");
279 }
280
281 int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
282                 struct rx_srcavail_state *rx_sa)
283 {
284         struct mbuf *mb;
285         int copied = rx_sa->used - rx_sa->reported;
286
287         if (rx_sa->used <= rx_sa->reported)
288                 return 0;
289
290         mb = sdp_alloc_mb_rdmardcompl(ssk->socket, copied, 0);
291
292         rx_sa->reported += copied;
293
294         /* TODO: What if no tx_credits available? */
295         sdp_post_send(ssk, mb);
296
297         return 0;
298 }
299
300 int sdp_post_sendsm(struct socket *sk)
301 {
302         struct mbuf *mb = sdp_alloc_mb_sendsm(sk, 0);
303
304         sdp_post_send(sdp_sk(sk), mb);
305
306         return 0;
307 }
308
309 static int sdp_update_iov_used(struct socket *sk, struct iovec *iov, int len)
310 {
311         sdp_dbg_data(sk, "updating consumed 0x%x bytes from iov\n", len);
312         while (len > 0) {
313                 if (iov->iov_len) {
314                         int copy = min_t(unsigned int, iov->iov_len, len);
315                         len -= copy;
316                         iov->iov_len -= copy;
317                         iov->iov_base += copy;
318                 }
319                 iov++;
320         }
321
322         return 0;
323 }
324
325 static inline int sge_bytes(struct ib_sge *sge, int sge_cnt)
326 {
327         int bytes = 0;
328
329         while (sge_cnt > 0) {
330                 bytes += sge->length;
331                 sge++;
332                 sge_cnt--;
333         }
334
335         return bytes;
336 }
337 void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack)
338 {
339         struct socket *sk = ssk->socket;
340         unsigned long flags;
341
342         spin_lock_irqsave(&ssk->tx_sa_lock, flags);
343
344         if (!ssk->tx_sa) {
345                 sdp_prf1(sk, NULL, "SendSM for cancelled/finished SrcAvail");
346                 goto out;
347         }
348
349         if (ssk->tx_sa->mseq > mseq_ack) {
350                 sdp_dbg_data(sk, "SendSM arrived for old SrcAvail. "
351                         "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
352                         mseq_ack, ssk->tx_sa->mseq);
353                 goto out;
354         }
355
356         sdp_dbg_data(sk, "Got SendSM - aborting SrcAvail\n");
357
358         ssk->tx_sa->abort_flags |= TX_SA_SENDSM;
359         cancel_delayed_work(&ssk->srcavail_cancel_work);
360
361         wake_up(sk->sk_sleep);
362         sdp_dbg_data(sk, "woke up sleepers\n");
363
364 out:
365         spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
366 }
367
368 void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack,
369                 u32 bytes_completed)
370 {
371         struct socket *sk = ssk->socket;
372         unsigned long flags;
373
374         sdp_prf1(sk, NULL, "RdmaRdCompl ssk=%p tx_sa=%p", ssk, ssk->tx_sa);
375         sdp_dbg_data(sk, "RdmaRdCompl ssk=%p tx_sa=%p\n", ssk, ssk->tx_sa);
376
377         spin_lock_irqsave(&ssk->tx_sa_lock, flags);
378
379         BUG_ON(!ssk);
380
381         if (!ssk->tx_sa) {
382                 sdp_dbg_data(sk, "Got RdmaRdCompl for aborted SrcAvail\n");
383                 goto out;
384         }
385
386         if (ssk->tx_sa->mseq > mseq_ack) {
387                 sdp_dbg_data(sk, "RdmaRdCompl arrived for old SrcAvail. "
388                         "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
389                         mseq_ack, ssk->tx_sa->mseq);
390                 goto out;
391         }
392
393         ssk->tx_sa->bytes_acked += bytes_completed;
394
395         wake_up(sk->sk_sleep);
396         sdp_dbg_data(sk, "woke up sleepers\n");
397
398 out:
399         spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
400         return;
401 }
402
403 static unsigned long sdp_get_max_memlockable_bytes(unsigned long offset)
404 {
405         unsigned long avail;
406         unsigned long lock_limit;
407
408         if (capable(CAP_IPC_LOCK))
409                 return ULONG_MAX;
410
411         lock_limit = current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur;
412         avail = lock_limit - (current->mm->locked_vm << PAGE_SHIFT);
413
414         return avail - offset;
415 }
416
417 static int sdp_alloc_fmr(struct socket *sk, void *uaddr, size_t len,
418         struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
419 {
420         struct ib_pool_fmr *fmr;
421         struct ib_umem *umem;
422         struct ib_device *dev;
423         u64 *pages;
424         struct ib_umem_chunk *chunk;
425         int n, j, k;
426         int rc = 0;
427         unsigned long max_lockable_bytes;
428
429         if (unlikely(len > SDP_MAX_RDMA_READ_LEN)) {
430                 sdp_dbg_data(sk, "len:0x%lx > FMR_SIZE: 0x%lx\n",
431                         len, SDP_MAX_RDMA_READ_LEN);
432                 len = SDP_MAX_RDMA_READ_LEN;
433         }
434
435         max_lockable_bytes = sdp_get_max_memlockable_bytes((unsigned long)uaddr & ~PAGE_MASK);
436         if (unlikely(len > max_lockable_bytes)) {
437                 sdp_dbg_data(sk, "len:0x%lx > RLIMIT_MEMLOCK available: 0x%lx\n",
438                         len, max_lockable_bytes);
439                 len = max_lockable_bytes;
440         }
441
442         sdp_dbg_data(sk, "user buf: %p, len:0x%lx max_lockable_bytes: 0x%lx\n",
443                         uaddr, len, max_lockable_bytes);
444
445         umem = ib_umem_get(&sdp_sk(sk)->context, (unsigned long)uaddr, len,
446                 IB_ACCESS_REMOTE_WRITE, 0);
447
448         if (IS_ERR(umem)) {
449                 rc = PTR_ERR(umem);
450                 sdp_warn(sk, "Error doing umem_get 0x%lx bytes: %d\n", len, rc);
451                 sdp_warn(sk, "RLIMIT_MEMLOCK: 0x%lx[cur] 0x%lx[max] CAP_IPC_LOCK: %d\n",
452                                 current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur,
453                                 current->signal->rlim[RLIMIT_MEMLOCK].rlim_max,
454                                 capable(CAP_IPC_LOCK));
455                 goto err_umem_get;
456         }
457
458         sdp_dbg_data(sk, "umem->offset = 0x%x, length = 0x%lx\n",
459                 umem->offset, umem->length);
460
461         pages = (u64 *) __get_free_page(GFP_KERNEL);
462         if (!pages)
463                 goto err_pages_alloc;
464
465         n = 0;
466
467         dev = sdp_sk(sk)->ib_device;
468         list_for_each_entry(chunk, &umem->chunk_list, list) {
469                 for (j = 0; j < chunk->nmap; ++j) {
470                         len = ib_sg_dma_len(dev,
471                                         &chunk->page_list[j]) >> PAGE_SHIFT;
472
473                         for (k = 0; k < len; ++k) {
474                                 pages[n++] = ib_sg_dma_address(dev,
475                                                 &chunk->page_list[j]) +
476                                         umem->page_size * k;
477
478                         }
479                 }
480         }
481
482         fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, pages, n, 0);
483         if (IS_ERR(fmr)) {
484                 sdp_warn(sk, "Error allocating fmr: %ld\n", PTR_ERR(fmr));
485                 goto err_fmr_alloc;
486         }
487
488         free_page((unsigned long) pages);
489
490         *_umem = umem;
491         *_fmr = fmr;
492
493         return 0;
494
495 err_fmr_alloc:  
496         free_page((unsigned long) pages);
497
498 err_pages_alloc:
499         ib_umem_release(umem);
500
501 err_umem_get:
502
503         return rc;
504 }
505
506 void sdp_free_fmr(struct socket *sk, struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
507 {
508         if (!sdp_sk(sk)->qp_active)
509                 return;
510
511         ib_fmr_pool_unmap(*_fmr);
512         *_fmr = NULL;
513
514         ib_umem_release(*_umem);
515         *_umem = NULL;
516 }
517
518 static int sdp_post_rdma_read(struct socket *sk, struct rx_srcavail_state *rx_sa)
519 {
520         struct sdp_sock *ssk = sdp_sk(sk);
521         struct ib_send_wr *bad_wr;
522         struct ib_send_wr wr = { NULL };
523         struct ib_sge sge;
524
525         wr.opcode = IB_WR_RDMA_READ;
526         wr.next = NULL;
527         wr.wr_id = SDP_OP_RDMA;
528         wr.wr.rdma.rkey = rx_sa->rkey;
529         wr.send_flags = 0;
530
531         ssk->tx_ring.rdma_inflight = rx_sa;
532
533         sge.addr = rx_sa->umem->offset;
534         sge.length = rx_sa->umem->length;
535         sge.lkey = rx_sa->fmr->fmr->lkey;
536
537         wr.wr.rdma.remote_addr = rx_sa->vaddr + rx_sa->used;
538         wr.num_sge = 1;
539         wr.sg_list = &sge;
540         rx_sa->busy++;
541
542         wr.send_flags = IB_SEND_SIGNALED;
543
544         return ib_post_send(ssk->qp, &wr, &bad_wr);
545 }
546
547 int sdp_rdma_to_iovec(struct socket *sk, struct iovec *iov, struct mbuf *mb,
548                 unsigned long *used)
549 {
550         struct sdp_sock *ssk = sdp_sk(sk);
551         struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(mb);
552         int got_srcavail_cancel;
553         int rc = 0;
554         int len = *used;
555         int copied;
556
557         sdp_dbg_data(ssk->socket, "preparing RDMA read."
558                 " len: 0x%x. buffer len: 0x%lx\n", len, iov->iov_len);
559
560         sock_hold(sk, SOCK_REF_RDMA_RD);
561
562         if (len > rx_sa->len) {
563                 sdp_warn(sk, "len:0x%x > rx_sa->len: 0x%x\n", len, rx_sa->len);
564                 WARN_ON(1);
565                 len = rx_sa->len;
566         }
567
568         rc = sdp_alloc_fmr(sk, iov->iov_base, len, &rx_sa->fmr, &rx_sa->umem);
569         if (rc) {
570                 sdp_warn(sk, "Error allocating fmr: %d\n", rc);
571                 goto err_alloc_fmr;
572         }
573
574         rc = sdp_post_rdma_read(sk, rx_sa);
575         if (unlikely(rc)) {
576                 sdp_warn(sk, "ib_post_send failed with status %d.\n", rc);
577                 sdp_set_error(ssk->socket, -ECONNRESET);
578                 wake_up(&ssk->wq);
579                 goto err_post_send;
580         }
581
582         sdp_prf(sk, mb, "Finished posting(rc=%d), now to wait", rc);
583
584         got_srcavail_cancel = ssk->srcavail_cancel_mseq > rx_sa->mseq;
585
586         sdp_arm_tx_cq(sk);
587
588         sdp_wait_rdma_wr_finished(ssk);
589
590         sdp_prf(sk, mb, "Finished waiting(rc=%d)", rc);
591         if (!ssk->qp_active) {
592                 sdp_dbg_data(sk, "QP destroyed during RDMA read\n");
593                 rc = -EPIPE;
594                 goto err_post_send;
595         }
596
597         copied = rx_sa->umem->length;
598
599         sdp_update_iov_used(sk, iov, copied);
600         rx_sa->used += copied;
601         atomic_add(copied, &ssk->rcv_nxt);
602         *used = copied;
603
604         ssk->tx_ring.rdma_inflight = NULL;
605
606 err_post_send:
607         sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
608
609 err_alloc_fmr:
610         if (rc && ssk->qp_active) {
611                 sdp_warn(sk, "Couldn't do RDMA - post sendsm\n");
612                 rx_sa->flags |= RX_SA_ABORTED;
613         }
614
615         sock_put(sk, SOCK_REF_RDMA_RD);
616
617         return rc;
618 }
619
620 static inline int wait_for_sndbuf(struct socket *sk, long *timeo_p)
621 {
622         struct sdp_sock *ssk = sdp_sk(sk);
623         int ret = 0;
624         int credits_needed = 1;
625
626         sdp_dbg_data(sk, "Wait for mem\n");
627
628         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
629
630         SDPSTATS_COUNTER_INC(send_wait_for_mem);
631
632         sdp_do_posts(ssk);
633
634         sdp_xmit_poll(ssk, 1);
635
636         ret = sdp_tx_wait_memory(ssk, timeo_p, &credits_needed);
637
638         return ret;
639 }
640
641 static int do_sdp_sendmsg_zcopy(struct socket *sk, struct tx_srcavail_state *tx_sa,
642                 struct iovec *iov, long *timeo)
643 {
644         struct sdp_sock *ssk = sdp_sk(sk);
645         int rc = 0;
646         unsigned long lock_flags;
647
648         rc = sdp_alloc_fmr(sk, iov->iov_base, iov->iov_len,
649                         &tx_sa->fmr, &tx_sa->umem);
650         if (rc) {
651                 sdp_warn(sk, "Error allocating fmr: %d\n", rc);
652                 goto err_alloc_fmr;
653         }
654
655         if (tx_slots_free(ssk) == 0) {
656                 rc = wait_for_sndbuf(sk, timeo);
657                 if (rc) {
658                         sdp_warn(sk, "Couldn't get send buffer\n");
659                         goto err_no_tx_slots;
660                 }
661         }
662
663         rc = sdp_post_srcavail(sk, tx_sa);
664         if (rc) {
665                 sdp_dbg(sk, "Error posting SrcAvail\n");
666                 goto err_abort_send;
667         }
668
669         rc = sdp_wait_rdmardcompl(ssk, timeo, 0);
670         if (unlikely(rc)) {
671                 enum tx_sa_flag f = tx_sa->abort_flags;
672
673                 if (f & TX_SA_SENDSM) {
674                         sdp_dbg_data(sk, "Got SendSM. use SEND verb.\n");
675                 } else if (f & TX_SA_ERROR) {
676                         sdp_dbg_data(sk, "SrcAvail error completion\n");
677                         sdp_reset(sk);
678                         SDPSTATS_COUNTER_INC(zcopy_tx_error);
679                 } else if (ssk->qp_active) {
680                         sdp_post_srcavail_cancel(sk);
681
682                         /* Wait for RdmaRdCompl/SendSM to
683                          * finish the transaction */
684                         *timeo = 2 * HZ;
685                         sdp_dbg_data(sk, "Waiting for SendSM\n");
686                         sdp_wait_rdmardcompl(ssk, timeo, 1);
687                         sdp_dbg_data(sk, "finished waiting\n");
688
689                         cancel_delayed_work(&ssk->srcavail_cancel_work);
690                 } else {
691                         sdp_dbg_data(sk, "QP was destroyed while waiting\n");
692                 }
693         } else {
694                 sdp_dbg_data(sk, "got RdmaRdCompl\n");
695         }
696
697         spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags);
698         ssk->tx_sa = NULL;
699         spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags);
700
701 err_abort_send:
702         sdp_update_iov_used(sk, iov, tx_sa->bytes_acked);
703
704 err_no_tx_slots:
705         sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
706
707 err_alloc_fmr:
708         return rc;      
709 }
710
711 int sdp_sendmsg_zcopy(struct kiocb *iocb, struct socket *sk, struct iovec *iov)
712 {
713         struct sdp_sock *ssk = sdp_sk(sk);
714         int rc = 0;
715         long timeo;
716         struct tx_srcavail_state *tx_sa;
717         int offset;
718         size_t bytes_to_copy = 0;
719         int copied = 0;
720
721         sdp_dbg_data(sk, "Sending iov: %p, iov_len: 0x%lx\n",
722                         iov->iov_base, iov->iov_len);
723         sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy start");
724         if (ssk->rx_sa) {
725                 sdp_dbg_data(sk, "Deadlock prevent: crossing SrcAvail\n");
726                 return 0;
727         }
728
729         sock_hold(ssk->socket, SOCK_REF_ZCOPY);
730
731         SDPSTATS_COUNTER_INC(sendmsg_zcopy_segment);
732
733         timeo = SDP_SRCAVAIL_ADV_TIMEOUT ;
734
735         /* Ok commence sending. */
736         offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
737
738         tx_sa = kmalloc(sizeof(struct tx_srcavail_state), GFP_KERNEL);
739         if (!tx_sa) {
740                 sdp_warn(sk, "Error allocating zcopy context\n");
741                 rc = -EAGAIN; /* Buffer too big - fallback to bcopy */
742                 goto err_alloc_tx_sa;
743         }
744
745         bytes_to_copy = iov->iov_len;
746         do {
747                 tx_sa_reset(tx_sa);
748
749                 rc = do_sdp_sendmsg_zcopy(sk, tx_sa, iov, &timeo);
750
751                 if (iov->iov_len && iov->iov_len < sdp_zcopy_thresh) {
752                         sdp_dbg_data(sk, "0x%lx bytes left, switching to bcopy\n",
753                                 iov->iov_len);
754                         break;
755                 }
756         } while (!rc && iov->iov_len > 0 && !tx_sa->abort_flags);
757
758         kfree(tx_sa);
759 err_alloc_tx_sa:
760         copied = bytes_to_copy - iov->iov_len;
761
762         sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy end rc: %d copied: %d", rc, copied);
763
764         sock_put(ssk->socket, SOCK_REF_ZCOPY);
765
766         if (rc < 0 && rc != -EAGAIN && rc != -ETIME)
767                 return rc;
768
769         return copied;
770 }
771
772 void sdp_abort_srcavail(struct socket *sk)
773 {
774         struct sdp_sock *ssk = sdp_sk(sk);
775         struct tx_srcavail_state *tx_sa = ssk->tx_sa;
776         unsigned long flags;
777
778         if (!tx_sa)
779                 return;
780
781         cancel_delayed_work(&ssk->srcavail_cancel_work);
782         flush_scheduled_work();
783
784         spin_lock_irqsave(&ssk->tx_sa_lock, flags);
785
786         sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
787
788         ssk->tx_sa = NULL;
789
790         spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
791 }
792
793 void sdp_abort_rdma_read(struct socket *sk)
794 {
795         struct sdp_sock *ssk = sdp_sk(sk);
796         struct rx_srcavail_state *rx_sa = ssk->rx_sa;
797
798         if (!rx_sa)
799                 return;
800
801         sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
802
803         ssk->rx_sa = NULL;
804 }