1 // SPDX-License-Identifier: GPL-2.0
2 /*
3 * Copyright (c) 2016-2018 Oracle. All rights reserved.
4 *
5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
6 */
7
8 #include <rdma/rw.h>
9
10 #include <linux/sunrpc/xdr.h>
11 #include <linux/sunrpc/rpc_rdma.h>
12 #include <linux/sunrpc/svc_rdma.h>
13
14 #include "xprt_rdma.h"
15 #include <trace/events/rpcrdma.h>
16
17 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc);
18 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc);
19
20 /* Each R/W context contains state for one chain of RDMA Read or
21 * Write Work Requests.
22 *
23 * Each WR chain handles a single contiguous server-side buffer,
24 * because scatterlist entries after the first have to start on
25 * page alignment. xdr_buf iovecs cannot guarantee alignment.
26 *
27 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
28 * from a client may contain a unique R_key, so each WR chain moves
29 * up to one segment at a time.
30 *
31 * The scatterlist makes this data structure over 4KB in size. To
32 * make it less likely to fail, and to handle the allocation for
33 * smaller I/O requests without disabling bottom-halves, these
34 * contexts are created on demand, but cached and reused until the
35 * controlling svcxprt_rdma is destroyed.
36 */
37 struct svc_rdma_rw_ctxt {
38 struct llist_node rw_node;
39 struct list_head rw_list;
40 struct rdma_rw_ctx rw_ctx;
41 unsigned int rw_nents;
42 struct sg_table rw_sg_table;
43 struct scatterlist rw_first_sgl[];
44 };
45
46 static inline struct svc_rdma_rw_ctxt *
svc_rdma_next_ctxt(struct list_head * list)47 svc_rdma_next_ctxt(struct list_head *list)
48 {
49 return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt,
50 rw_list);
51 }
52
53 static struct svc_rdma_rw_ctxt *
svc_rdma_get_rw_ctxt(struct svcxprt_rdma * rdma,unsigned int sges)54 svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges)
55 {
56 struct svc_rdma_rw_ctxt *ctxt;
57 struct llist_node *node;
58
59 spin_lock(&rdma->sc_rw_ctxt_lock);
60 node = llist_del_first(&rdma->sc_rw_ctxts);
61 spin_unlock(&rdma->sc_rw_ctxt_lock);
62 if (node) {
63 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
64 } else {
65 ctxt = kmalloc(struct_size(ctxt, rw_first_sgl, SG_CHUNK_SIZE),
66 GFP_KERNEL);
67 if (!ctxt)
68 goto out_noctx;
69
70 INIT_LIST_HEAD(&ctxt->rw_list);
71 }
72
73 ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
74 if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
75 ctxt->rw_sg_table.sgl,
76 SG_CHUNK_SIZE))
77 goto out_free;
78 return ctxt;
79
80 out_free:
81 kfree(ctxt);
82 out_noctx:
83 trace_svcrdma_no_rwctx_err(rdma, sges);
84 return NULL;
85 }
86
__svc_rdma_put_rw_ctxt(struct svcxprt_rdma * rdma,struct svc_rdma_rw_ctxt * ctxt,struct llist_head * list)87 static void __svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
88 struct svc_rdma_rw_ctxt *ctxt,
89 struct llist_head *list)
90 {
91 sg_free_table_chained(&ctxt->rw_sg_table, SG_CHUNK_SIZE);
92 llist_add(&ctxt->rw_node, list);
93 }
94
svc_rdma_put_rw_ctxt(struct svcxprt_rdma * rdma,struct svc_rdma_rw_ctxt * ctxt)95 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
96 struct svc_rdma_rw_ctxt *ctxt)
97 {
98 __svc_rdma_put_rw_ctxt(rdma, ctxt, &rdma->sc_rw_ctxts);
99 }
100
101 /**
102 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
103 * @rdma: transport about to be destroyed
104 *
105 */
svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma * rdma)106 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
107 {
108 struct svc_rdma_rw_ctxt *ctxt;
109 struct llist_node *node;
110
111 while ((node = llist_del_first(&rdma->sc_rw_ctxts)) != NULL) {
112 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
113 kfree(ctxt);
114 }
115 }
116
117 /**
118 * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O
119 * @rdma: controlling transport instance
120 * @ctxt: R/W context to prepare
121 * @offset: RDMA offset
122 * @handle: RDMA tag/handle
123 * @direction: I/O direction
124 *
125 * Returns on success, the number of WQEs that will be needed
126 * on the workqueue, or a negative errno.
127 */
svc_rdma_rw_ctx_init(struct svcxprt_rdma * rdma,struct svc_rdma_rw_ctxt * ctxt,u64 offset,u32 handle,enum dma_data_direction direction)128 static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma,
129 struct svc_rdma_rw_ctxt *ctxt,
130 u64 offset, u32 handle,
131 enum dma_data_direction direction)
132 {
133 int ret;
134
135 ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, rdma->sc_port_num,
136 ctxt->rw_sg_table.sgl, ctxt->rw_nents,
137 0, offset, handle, direction);
138 if (unlikely(ret < 0)) {
139 svc_rdma_put_rw_ctxt(rdma, ctxt);
140 trace_svcrdma_dma_map_rw_err(rdma, ctxt->rw_nents, ret);
141 }
142 return ret;
143 }
144
145 /* A chunk context tracks all I/O for moving one Read or Write
146 * chunk. This is a set of rdma_rw's that handle data movement
147 * for all segments of one chunk.
148 *
149 * These are small, acquired with a single allocator call, and
150 * no more than one is needed per chunk. They are allocated on
151 * demand, and not cached.
152 */
153 struct svc_rdma_chunk_ctxt {
154 struct rpc_rdma_cid cc_cid;
155 struct ib_cqe cc_cqe;
156 struct svcxprt_rdma *cc_rdma;
157 struct list_head cc_rwctxts;
158 ktime_t cc_posttime;
159 int cc_sqecount;
160 enum ib_wc_status cc_status;
161 struct completion cc_done;
162 };
163
svc_rdma_cc_cid_init(struct svcxprt_rdma * rdma,struct rpc_rdma_cid * cid)164 static void svc_rdma_cc_cid_init(struct svcxprt_rdma *rdma,
165 struct rpc_rdma_cid *cid)
166 {
167 cid->ci_queue_id = rdma->sc_sq_cq->res.id;
168 cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids);
169 }
170
svc_rdma_cc_init(struct svcxprt_rdma * rdma,struct svc_rdma_chunk_ctxt * cc)171 static void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
172 struct svc_rdma_chunk_ctxt *cc)
173 {
174 svc_rdma_cc_cid_init(rdma, &cc->cc_cid);
175 cc->cc_rdma = rdma;
176
177 INIT_LIST_HEAD(&cc->cc_rwctxts);
178 cc->cc_sqecount = 0;
179 }
180
181 /*
182 * The consumed rw_ctx's are cleaned and placed on a local llist so
183 * that only one atomic llist operation is needed to put them all
184 * back on the free list.
185 */
svc_rdma_cc_release(struct svc_rdma_chunk_ctxt * cc,enum dma_data_direction dir)186 static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc,
187 enum dma_data_direction dir)
188 {
189 struct svcxprt_rdma *rdma = cc->cc_rdma;
190 struct llist_node *first, *last;
191 struct svc_rdma_rw_ctxt *ctxt;
192 LLIST_HEAD(free);
193
194 first = last = NULL;
195 while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) {
196 list_del(&ctxt->rw_list);
197
198 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
199 rdma->sc_port_num, ctxt->rw_sg_table.sgl,
200 ctxt->rw_nents, dir);
201 __svc_rdma_put_rw_ctxt(rdma, ctxt, &free);
202
203 ctxt->rw_node.next = first;
204 first = &ctxt->rw_node;
205 if (!last)
206 last = first;
207 }
208 if (first)
209 llist_add_batch(first, last, &rdma->sc_rw_ctxts);
210 }
211
212 /* State for sending a Write or Reply chunk.
213 * - Tracks progress of writing one chunk over all its segments
214 * - Stores arguments for the SGL constructor functions
215 */
216 struct svc_rdma_write_info {
217 const struct svc_rdma_chunk *wi_chunk;
218
219 /* write state of this chunk */
220 unsigned int wi_seg_off;
221 unsigned int wi_seg_no;
222
223 /* SGL constructor arguments */
224 const struct xdr_buf *wi_xdr;
225 unsigned char *wi_base;
226 unsigned int wi_next_off;
227
228 struct svc_rdma_chunk_ctxt wi_cc;
229 };
230
231 static struct svc_rdma_write_info *
svc_rdma_write_info_alloc(struct svcxprt_rdma * rdma,const struct svc_rdma_chunk * chunk)232 svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma,
233 const struct svc_rdma_chunk *chunk)
234 {
235 struct svc_rdma_write_info *info;
236
237 info = kmalloc(sizeof(*info), GFP_KERNEL);
238 if (!info)
239 return info;
240
241 info->wi_chunk = chunk;
242 info->wi_seg_off = 0;
243 info->wi_seg_no = 0;
244 svc_rdma_cc_init(rdma, &info->wi_cc);
245 info->wi_cc.cc_cqe.done = svc_rdma_write_done;
246 return info;
247 }
248
svc_rdma_write_info_free(struct svc_rdma_write_info * info)249 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
250 {
251 svc_rdma_cc_release(&info->wi_cc, DMA_TO_DEVICE);
252 kfree(info);
253 }
254
255 /**
256 * svc_rdma_write_done - Write chunk completion
257 * @cq: controlling Completion Queue
258 * @wc: Work Completion
259 *
260 * Pages under I/O are freed by a subsequent Send completion.
261 */
svc_rdma_write_done(struct ib_cq * cq,struct ib_wc * wc)262 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
263 {
264 struct ib_cqe *cqe = wc->wr_cqe;
265 struct svc_rdma_chunk_ctxt *cc =
266 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
267 struct svcxprt_rdma *rdma = cc->cc_rdma;
268 struct svc_rdma_write_info *info =
269 container_of(cc, struct svc_rdma_write_info, wi_cc);
270
271 switch (wc->status) {
272 case IB_WC_SUCCESS:
273 trace_svcrdma_wc_write(wc, &cc->cc_cid);
274 break;
275 case IB_WC_WR_FLUSH_ERR:
276 trace_svcrdma_wc_write_flush(wc, &cc->cc_cid);
277 break;
278 default:
279 trace_svcrdma_wc_write_err(wc, &cc->cc_cid);
280 }
281
282 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
283
284 if (unlikely(wc->status != IB_WC_SUCCESS))
285 svc_xprt_deferred_close(&rdma->sc_xprt);
286
287 svc_rdma_write_info_free(info);
288 }
289
290 /* State for pulling a Read chunk.
291 */
292 struct svc_rdma_read_info {
293 struct svc_rqst *ri_rqst;
294 struct svc_rdma_recv_ctxt *ri_readctxt;
295 unsigned int ri_pageno;
296 unsigned int ri_pageoff;
297 unsigned int ri_totalbytes;
298
299 struct svc_rdma_chunk_ctxt ri_cc;
300 };
301
302 static struct svc_rdma_read_info *
svc_rdma_read_info_alloc(struct svcxprt_rdma * rdma)303 svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma)
304 {
305 struct svc_rdma_read_info *info;
306
307 info = kmalloc(sizeof(*info), GFP_KERNEL);
308 if (!info)
309 return info;
310
311 svc_rdma_cc_init(rdma, &info->ri_cc);
312 info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done;
313 return info;
314 }
315
svc_rdma_read_info_free(struct svc_rdma_read_info * info)316 static void svc_rdma_read_info_free(struct svc_rdma_read_info *info)
317 {
318 svc_rdma_cc_release(&info->ri_cc, DMA_FROM_DEVICE);
319 kfree(info);
320 }
321
322 /**
323 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
324 * @cq: controlling Completion Queue
325 * @wc: Work Completion
326 *
327 */
svc_rdma_wc_read_done(struct ib_cq * cq,struct ib_wc * wc)328 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
329 {
330 struct ib_cqe *cqe = wc->wr_cqe;
331 struct svc_rdma_chunk_ctxt *cc =
332 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
333 struct svc_rdma_read_info *info;
334
335 switch (wc->status) {
336 case IB_WC_SUCCESS:
337 info = container_of(cc, struct svc_rdma_read_info, ri_cc);
338 trace_svcrdma_wc_read(wc, &cc->cc_cid, info->ri_totalbytes,
339 cc->cc_posttime);
340 break;
341 case IB_WC_WR_FLUSH_ERR:
342 trace_svcrdma_wc_read_flush(wc, &cc->cc_cid);
343 break;
344 default:
345 trace_svcrdma_wc_read_err(wc, &cc->cc_cid);
346 }
347
348 svc_rdma_wake_send_waiters(cc->cc_rdma, cc->cc_sqecount);
349 cc->cc_status = wc->status;
350 complete(&cc->cc_done);
351 return;
352 }
353
354 /* This function sleeps when the transport's Send Queue is congested.
355 *
356 * Assumptions:
357 * - If ib_post_send() succeeds, only one completion is expected,
358 * even if one or more WRs are flushed. This is true when posting
359 * an rdma_rw_ctx or when posting a single signaled WR.
360 */
svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt * cc)361 static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
362 {
363 struct svcxprt_rdma *rdma = cc->cc_rdma;
364 struct ib_send_wr *first_wr;
365 const struct ib_send_wr *bad_wr;
366 struct list_head *tmp;
367 struct ib_cqe *cqe;
368 int ret;
369
370 if (cc->cc_sqecount > rdma->sc_sq_depth)
371 return -EINVAL;
372
373 first_wr = NULL;
374 cqe = &cc->cc_cqe;
375 list_for_each(tmp, &cc->cc_rwctxts) {
376 struct svc_rdma_rw_ctxt *ctxt;
377
378 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list);
379 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp,
380 rdma->sc_port_num, cqe, first_wr);
381 cqe = NULL;
382 }
383
384 do {
385 if (atomic_sub_return(cc->cc_sqecount,
386 &rdma->sc_sq_avail) > 0) {
387 cc->cc_posttime = ktime_get();
388 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
389 if (ret)
390 break;
391 return 0;
392 }
393
394 percpu_counter_inc(&svcrdma_stat_sq_starve);
395 trace_svcrdma_sq_full(rdma);
396 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
397 wait_event(rdma->sc_send_wait,
398 atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
399 trace_svcrdma_sq_retry(rdma);
400 } while (1);
401
402 trace_svcrdma_sq_post_err(rdma, ret);
403 svc_xprt_deferred_close(&rdma->sc_xprt);
404
405 /* If even one was posted, there will be a completion. */
406 if (bad_wr != first_wr)
407 return 0;
408
409 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
410 wake_up(&rdma->sc_send_wait);
411 return -ENOTCONN;
412 }
413
414 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf
415 */
svc_rdma_vec_to_sg(struct svc_rdma_write_info * info,unsigned int len,struct svc_rdma_rw_ctxt * ctxt)416 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info,
417 unsigned int len,
418 struct svc_rdma_rw_ctxt *ctxt)
419 {
420 struct scatterlist *sg = ctxt->rw_sg_table.sgl;
421
422 sg_set_buf(&sg[0], info->wi_base, len);
423 info->wi_base += len;
424
425 ctxt->rw_nents = 1;
426 }
427
428 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
429 */
svc_rdma_pagelist_to_sg(struct svc_rdma_write_info * info,unsigned int remaining,struct svc_rdma_rw_ctxt * ctxt)430 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
431 unsigned int remaining,
432 struct svc_rdma_rw_ctxt *ctxt)
433 {
434 unsigned int sge_no, sge_bytes, page_off, page_no;
435 const struct xdr_buf *xdr = info->wi_xdr;
436 struct scatterlist *sg;
437 struct page **page;
438
439 page_off = info->wi_next_off + xdr->page_base;
440 page_no = page_off >> PAGE_SHIFT;
441 page_off = offset_in_page(page_off);
442 page = xdr->pages + page_no;
443 info->wi_next_off += remaining;
444 sg = ctxt->rw_sg_table.sgl;
445 sge_no = 0;
446 do {
447 sge_bytes = min_t(unsigned int, remaining,
448 PAGE_SIZE - page_off);
449 sg_set_page(sg, *page, sge_bytes, page_off);
450
451 remaining -= sge_bytes;
452 sg = sg_next(sg);
453 page_off = 0;
454 sge_no++;
455 page++;
456 } while (remaining);
457
458 ctxt->rw_nents = sge_no;
459 }
460
461 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing
462 * an RPC Reply.
463 */
464 static int
svc_rdma_build_writes(struct svc_rdma_write_info * info,void (* constructor)(struct svc_rdma_write_info * info,unsigned int len,struct svc_rdma_rw_ctxt * ctxt),unsigned int remaining)465 svc_rdma_build_writes(struct svc_rdma_write_info *info,
466 void (*constructor)(struct svc_rdma_write_info *info,
467 unsigned int len,
468 struct svc_rdma_rw_ctxt *ctxt),
469 unsigned int remaining)
470 {
471 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
472 struct svcxprt_rdma *rdma = cc->cc_rdma;
473 const struct svc_rdma_segment *seg;
474 struct svc_rdma_rw_ctxt *ctxt;
475 int ret;
476
477 do {
478 unsigned int write_len;
479 u64 offset;
480
481 if (info->wi_seg_no >= info->wi_chunk->ch_segcount)
482 goto out_overflow;
483
484 seg = &info->wi_chunk->ch_segments[info->wi_seg_no];
485 write_len = min(remaining, seg->rs_length - info->wi_seg_off);
486 if (!write_len)
487 goto out_overflow;
488 ctxt = svc_rdma_get_rw_ctxt(rdma,
489 (write_len >> PAGE_SHIFT) + 2);
490 if (!ctxt)
491 return -ENOMEM;
492
493 constructor(info, write_len, ctxt);
494 offset = seg->rs_offset + info->wi_seg_off;
495 ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle,
496 DMA_TO_DEVICE);
497 if (ret < 0)
498 return -EIO;
499 percpu_counter_inc(&svcrdma_stat_write);
500
501 list_add(&ctxt->rw_list, &cc->cc_rwctxts);
502 cc->cc_sqecount += ret;
503 if (write_len == seg->rs_length - info->wi_seg_off) {
504 info->wi_seg_no++;
505 info->wi_seg_off = 0;
506 } else {
507 info->wi_seg_off += write_len;
508 }
509 remaining -= write_len;
510 } while (remaining);
511
512 return 0;
513
514 out_overflow:
515 trace_svcrdma_small_wrch_err(rdma, remaining, info->wi_seg_no,
516 info->wi_chunk->ch_segcount);
517 return -E2BIG;
518 }
519
520 /**
521 * svc_rdma_iov_write - Construct RDMA Writes from an iov
522 * @info: pointer to write arguments
523 * @iov: kvec to write
524 *
525 * Returns:
526 * On success, returns zero
527 * %-E2BIG if the client-provided Write chunk is too small
528 * %-ENOMEM if a resource has been exhausted
529 * %-EIO if an rdma-rw error occurred
530 */
svc_rdma_iov_write(struct svc_rdma_write_info * info,const struct kvec * iov)531 static int svc_rdma_iov_write(struct svc_rdma_write_info *info,
532 const struct kvec *iov)
533 {
534 info->wi_base = iov->iov_base;
535 return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
536 iov->iov_len);
537 }
538
539 /**
540 * svc_rdma_pages_write - Construct RDMA Writes from pages
541 * @info: pointer to write arguments
542 * @xdr: xdr_buf with pages to write
543 * @offset: offset into the content of @xdr
544 * @length: number of bytes to write
545 *
546 * Returns:
547 * On success, returns zero
548 * %-E2BIG if the client-provided Write chunk is too small
549 * %-ENOMEM if a resource has been exhausted
550 * %-EIO if an rdma-rw error occurred
551 */
svc_rdma_pages_write(struct svc_rdma_write_info * info,const struct xdr_buf * xdr,unsigned int offset,unsigned long length)552 static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
553 const struct xdr_buf *xdr,
554 unsigned int offset,
555 unsigned long length)
556 {
557 info->wi_xdr = xdr;
558 info->wi_next_off = offset - xdr->head[0].iov_len;
559 return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
560 length);
561 }
562
563 /**
564 * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf
565 * @xdr: xdr_buf to write
566 * @data: pointer to write arguments
567 *
568 * Returns:
569 * On success, returns zero
570 * %-E2BIG if the client-provided Write chunk is too small
571 * %-ENOMEM if a resource has been exhausted
572 * %-EIO if an rdma-rw error occurred
573 */
svc_rdma_xb_write(const struct xdr_buf * xdr,void * data)574 static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data)
575 {
576 struct svc_rdma_write_info *info = data;
577 int ret;
578
579 if (xdr->head[0].iov_len) {
580 ret = svc_rdma_iov_write(info, &xdr->head[0]);
581 if (ret < 0)
582 return ret;
583 }
584
585 if (xdr->page_len) {
586 ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
587 xdr->page_len);
588 if (ret < 0)
589 return ret;
590 }
591
592 if (xdr->tail[0].iov_len) {
593 ret = svc_rdma_iov_write(info, &xdr->tail[0]);
594 if (ret < 0)
595 return ret;
596 }
597
598 return xdr->len;
599 }
600
601 /**
602 * svc_rdma_send_write_chunk - Write all segments in a Write chunk
603 * @rdma: controlling RDMA transport
604 * @chunk: Write chunk provided by the client
605 * @xdr: xdr_buf containing the data payload
606 *
607 * Returns a non-negative number of bytes the chunk consumed, or
608 * %-E2BIG if the payload was larger than the Write chunk,
609 * %-EINVAL if client provided too many segments,
610 * %-ENOMEM if rdma_rw context pool was exhausted,
611 * %-ENOTCONN if posting failed (connection is lost),
612 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
613 */
svc_rdma_send_write_chunk(struct svcxprt_rdma * rdma,const struct svc_rdma_chunk * chunk,const struct xdr_buf * xdr)614 int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
615 const struct svc_rdma_chunk *chunk,
616 const struct xdr_buf *xdr)
617 {
618 struct svc_rdma_write_info *info;
619 struct svc_rdma_chunk_ctxt *cc;
620 int ret;
621
622 info = svc_rdma_write_info_alloc(rdma, chunk);
623 if (!info)
624 return -ENOMEM;
625 cc = &info->wi_cc;
626
627 ret = svc_rdma_xb_write(xdr, info);
628 if (ret != xdr->len)
629 goto out_err;
630
631 trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
632 ret = svc_rdma_post_chunk_ctxt(cc);
633 if (ret < 0)
634 goto out_err;
635 return xdr->len;
636
637 out_err:
638 svc_rdma_write_info_free(info);
639 return ret;
640 }
641
642 /**
643 * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
644 * @rdma: controlling RDMA transport
645 * @rctxt: Write and Reply chunks from client
646 * @xdr: xdr_buf containing an RPC Reply
647 *
648 * Returns a non-negative number of bytes the chunk consumed, or
649 * %-E2BIG if the payload was larger than the Reply chunk,
650 * %-EINVAL if client provided too many segments,
651 * %-ENOMEM if rdma_rw context pool was exhausted,
652 * %-ENOTCONN if posting failed (connection is lost),
653 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
654 */
svc_rdma_send_reply_chunk(struct svcxprt_rdma * rdma,const struct svc_rdma_recv_ctxt * rctxt,const struct xdr_buf * xdr)655 int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
656 const struct svc_rdma_recv_ctxt *rctxt,
657 const struct xdr_buf *xdr)
658 {
659 struct svc_rdma_write_info *info;
660 struct svc_rdma_chunk_ctxt *cc;
661 struct svc_rdma_chunk *chunk;
662 int ret;
663
664 if (pcl_is_empty(&rctxt->rc_reply_pcl))
665 return 0;
666
667 chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
668 info = svc_rdma_write_info_alloc(rdma, chunk);
669 if (!info)
670 return -ENOMEM;
671 cc = &info->wi_cc;
672
673 ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
674 svc_rdma_xb_write, info);
675 if (ret < 0)
676 goto out_err;
677
678 trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
679 ret = svc_rdma_post_chunk_ctxt(cc);
680 if (ret < 0)
681 goto out_err;
682
683 return xdr->len;
684
685 out_err:
686 svc_rdma_write_info_free(info);
687 return ret;
688 }
689
690 /**
691 * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment
692 * @info: context for ongoing I/O
693 * @segment: co-ordinates of remote memory to be read
694 *
695 * Returns:
696 * %0: the Read WR chain was constructed successfully
697 * %-EINVAL: there were not enough rq_pages to finish
698 * %-ENOMEM: allocating a local resources failed
699 * %-EIO: a DMA mapping error occurred
700 */
svc_rdma_build_read_segment(struct svc_rdma_read_info * info,const struct svc_rdma_segment * segment)701 static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
702 const struct svc_rdma_segment *segment)
703 {
704 struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
705 struct svc_rdma_chunk_ctxt *cc = &info->ri_cc;
706 struct svc_rqst *rqstp = info->ri_rqst;
707 unsigned int sge_no, seg_len, len;
708 struct svc_rdma_rw_ctxt *ctxt;
709 struct scatterlist *sg;
710 int ret;
711
712 len = segment->rs_length;
713 sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT;
714 ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no);
715 if (!ctxt)
716 return -ENOMEM;
717 ctxt->rw_nents = sge_no;
718
719 sg = ctxt->rw_sg_table.sgl;
720 for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) {
721 seg_len = min_t(unsigned int, len,
722 PAGE_SIZE - info->ri_pageoff);
723
724 if (!info->ri_pageoff)
725 head->rc_page_count++;
726
727 sg_set_page(sg, rqstp->rq_pages[info->ri_pageno],
728 seg_len, info->ri_pageoff);
729 sg = sg_next(sg);
730
731 info->ri_pageoff += seg_len;
732 if (info->ri_pageoff == PAGE_SIZE) {
733 info->ri_pageno++;
734 info->ri_pageoff = 0;
735 }
736 len -= seg_len;
737
738 /* Safety check */
739 if (len &&
740 &rqstp->rq_pages[info->ri_pageno + 1] > rqstp->rq_page_end)
741 goto out_overrun;
742 }
743
744 ret = svc_rdma_rw_ctx_init(cc->cc_rdma, ctxt, segment->rs_offset,
745 segment->rs_handle, DMA_FROM_DEVICE);
746 if (ret < 0)
747 return -EIO;
748 percpu_counter_inc(&svcrdma_stat_read);
749
750 list_add(&ctxt->rw_list, &cc->cc_rwctxts);
751 cc->cc_sqecount += ret;
752 return 0;
753
754 out_overrun:
755 trace_svcrdma_page_overrun_err(cc->cc_rdma, rqstp, info->ri_pageno);
756 return -EINVAL;
757 }
758
759 /**
760 * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk
761 * @info: context for ongoing I/O
762 * @chunk: Read chunk to pull
763 *
764 * Return values:
765 * %0: the Read WR chain was constructed successfully
766 * %-EINVAL: there were not enough resources to finish
767 * %-ENOMEM: allocating a local resources failed
768 * %-EIO: a DMA mapping error occurred
769 */
svc_rdma_build_read_chunk(struct svc_rdma_read_info * info,const struct svc_rdma_chunk * chunk)770 static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info,
771 const struct svc_rdma_chunk *chunk)
772 {
773 const struct svc_rdma_segment *segment;
774 int ret;
775
776 ret = -EINVAL;
777 pcl_for_each_segment(segment, chunk) {
778 ret = svc_rdma_build_read_segment(info, segment);
779 if (ret < 0)
780 break;
781 info->ri_totalbytes += segment->rs_length;
782 }
783 return ret;
784 }
785
786 /**
787 * svc_rdma_copy_inline_range - Copy part of the inline content into pages
788 * @info: context for RDMA Reads
789 * @offset: offset into the Receive buffer of region to copy
790 * @remaining: length of region to copy
791 *
792 * Take a page at a time from rqstp->rq_pages and copy the inline
793 * content from the Receive buffer into that page. Update
794 * info->ri_pageno and info->ri_pageoff so that the next RDMA Read
795 * result will land contiguously with the copied content.
796 *
797 * Return values:
798 * %0: Inline content was successfully copied
799 * %-EINVAL: offset or length was incorrect
800 */
svc_rdma_copy_inline_range(struct svc_rdma_read_info * info,unsigned int offset,unsigned int remaining)801 static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info,
802 unsigned int offset,
803 unsigned int remaining)
804 {
805 struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
806 unsigned char *dst, *src = head->rc_recv_buf;
807 struct svc_rqst *rqstp = info->ri_rqst;
808 unsigned int page_no, numpages;
809
810 numpages = PAGE_ALIGN(info->ri_pageoff + remaining) >> PAGE_SHIFT;
811 for (page_no = 0; page_no < numpages; page_no++) {
812 unsigned int page_len;
813
814 page_len = min_t(unsigned int, remaining,
815 PAGE_SIZE - info->ri_pageoff);
816
817 if (!info->ri_pageoff)
818 head->rc_page_count++;
819
820 dst = page_address(rqstp->rq_pages[info->ri_pageno]);
821 memcpy(dst + info->ri_pageno, src + offset, page_len);
822
823 info->ri_totalbytes += page_len;
824 info->ri_pageoff += page_len;
825 if (info->ri_pageoff == PAGE_SIZE) {
826 info->ri_pageno++;
827 info->ri_pageoff = 0;
828 }
829 remaining -= page_len;
830 offset += page_len;
831 }
832
833 return -EINVAL;
834 }
835
836 /**
837 * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks
838 * @info: context for RDMA Reads
839 *
840 * The chunk data lands in rqstp->rq_arg as a series of contiguous pages,
841 * like an incoming TCP call.
842 *
843 * Return values:
844 * %0: RDMA Read WQEs were successfully built
845 * %-EINVAL: client provided too many chunks or segments,
846 * %-ENOMEM: rdma_rw context pool was exhausted,
847 * %-ENOTCONN: posting failed (connection is lost),
848 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
849 */
svc_rdma_read_multiple_chunks(struct svc_rdma_read_info * info)850 static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *info)
851 {
852 struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
853 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
854 struct xdr_buf *buf = &info->ri_rqst->rq_arg;
855 struct svc_rdma_chunk *chunk, *next;
856 unsigned int start, length;
857 int ret;
858
859 start = 0;
860 chunk = pcl_first_chunk(pcl);
861 length = chunk->ch_position;
862 ret = svc_rdma_copy_inline_range(info, start, length);
863 if (ret < 0)
864 return ret;
865
866 pcl_for_each_chunk(chunk, pcl) {
867 ret = svc_rdma_build_read_chunk(info, chunk);
868 if (ret < 0)
869 return ret;
870
871 next = pcl_next_chunk(pcl, chunk);
872 if (!next)
873 break;
874
875 start += length;
876 length = next->ch_position - info->ri_totalbytes;
877 ret = svc_rdma_copy_inline_range(info, start, length);
878 if (ret < 0)
879 return ret;
880 }
881
882 start += length;
883 length = head->rc_byte_len - start;
884 ret = svc_rdma_copy_inline_range(info, start, length);
885 if (ret < 0)
886 return ret;
887
888 buf->len += info->ri_totalbytes;
889 buf->buflen += info->ri_totalbytes;
890
891 buf->head[0].iov_base = page_address(info->ri_rqst->rq_pages[0]);
892 buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes);
893 buf->pages = &info->ri_rqst->rq_pages[1];
894 buf->page_len = info->ri_totalbytes - buf->head[0].iov_len;
895 return 0;
896 }
897
898 /**
899 * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks
900 * @info: context for RDMA Reads
901 *
902 * The chunk data lands in the page list of rqstp->rq_arg.pages.
903 *
904 * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec.
905 * Therefore, XDR round-up of the Read chunk and trailing
906 * inline content must both be added at the end of the pagelist.
907 *
908 * Return values:
909 * %0: RDMA Read WQEs were successfully built
910 * %-EINVAL: client provided too many chunks or segments,
911 * %-ENOMEM: rdma_rw context pool was exhausted,
912 * %-ENOTCONN: posting failed (connection is lost),
913 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
914 */
svc_rdma_read_data_item(struct svc_rdma_read_info * info)915 static int svc_rdma_read_data_item(struct svc_rdma_read_info *info)
916 {
917 struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
918 struct xdr_buf *buf = &info->ri_rqst->rq_arg;
919 struct svc_rdma_chunk *chunk;
920 unsigned int length;
921 int ret;
922
923 chunk = pcl_first_chunk(&head->rc_read_pcl);
924 ret = svc_rdma_build_read_chunk(info, chunk);
925 if (ret < 0)
926 goto out;
927
928 /* Split the Receive buffer between the head and tail
929 * buffers at Read chunk's position. XDR roundup of the
930 * chunk is not included in either the pagelist or in
931 * the tail.
932 */
933 buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position;
934 buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position;
935 buf->head[0].iov_len = chunk->ch_position;
936
937 /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
938 *
939 * If the client already rounded up the chunk length, the
940 * length does not change. Otherwise, the length of the page
941 * list is increased to include XDR round-up.
942 *
943 * Currently these chunks always start at page offset 0,
944 * thus the rounded-up length never crosses a page boundary.
945 */
946 buf->pages = &info->ri_rqst->rq_pages[0];
947 length = xdr_align_size(chunk->ch_length);
948 buf->page_len = length;
949 buf->len += length;
950 buf->buflen += length;
951
952 out:
953 return ret;
954 }
955
956 /**
957 * svc_rdma_read_chunk_range - Build RDMA Read WQEs for portion of a chunk
958 * @info: context for RDMA Reads
959 * @chunk: parsed Call chunk to pull
960 * @offset: offset of region to pull
961 * @length: length of region to pull
962 *
963 * Return values:
964 * %0: RDMA Read WQEs were successfully built
965 * %-EINVAL: there were not enough resources to finish
966 * %-ENOMEM: rdma_rw context pool was exhausted,
967 * %-ENOTCONN: posting failed (connection is lost),
968 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
969 */
svc_rdma_read_chunk_range(struct svc_rdma_read_info * info,const struct svc_rdma_chunk * chunk,unsigned int offset,unsigned int length)970 static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info,
971 const struct svc_rdma_chunk *chunk,
972 unsigned int offset, unsigned int length)
973 {
974 const struct svc_rdma_segment *segment;
975 int ret;
976
977 ret = -EINVAL;
978 pcl_for_each_segment(segment, chunk) {
979 struct svc_rdma_segment dummy;
980
981 if (offset > segment->rs_length) {
982 offset -= segment->rs_length;
983 continue;
984 }
985
986 dummy.rs_handle = segment->rs_handle;
987 dummy.rs_length = min_t(u32, length, segment->rs_length) - offset;
988 dummy.rs_offset = segment->rs_offset + offset;
989
990 ret = svc_rdma_build_read_segment(info, &dummy);
991 if (ret < 0)
992 break;
993
994 info->ri_totalbytes += dummy.rs_length;
995 length -= dummy.rs_length;
996 offset = 0;
997 }
998 return ret;
999 }
1000
1001 /**
1002 * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message
1003 * @info: context for RDMA Reads
1004 *
1005 * Return values:
1006 * %0: RDMA Read WQEs were successfully built
1007 * %-EINVAL: there were not enough resources to finish
1008 * %-ENOMEM: rdma_rw context pool was exhausted,
1009 * %-ENOTCONN: posting failed (connection is lost),
1010 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1011 */
svc_rdma_read_call_chunk(struct svc_rdma_read_info * info)1012 static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info)
1013 {
1014 struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
1015 const struct svc_rdma_chunk *call_chunk =
1016 pcl_first_chunk(&head->rc_call_pcl);
1017 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
1018 struct svc_rdma_chunk *chunk, *next;
1019 unsigned int start, length;
1020 int ret;
1021
1022 if (pcl_is_empty(pcl))
1023 return svc_rdma_build_read_chunk(info, call_chunk);
1024
1025 start = 0;
1026 chunk = pcl_first_chunk(pcl);
1027 length = chunk->ch_position;
1028 ret = svc_rdma_read_chunk_range(info, call_chunk, start, length);
1029 if (ret < 0)
1030 return ret;
1031
1032 pcl_for_each_chunk(chunk, pcl) {
1033 ret = svc_rdma_build_read_chunk(info, chunk);
1034 if (ret < 0)
1035 return ret;
1036
1037 next = pcl_next_chunk(pcl, chunk);
1038 if (!next)
1039 break;
1040
1041 start += length;
1042 length = next->ch_position - info->ri_totalbytes;
1043 ret = svc_rdma_read_chunk_range(info, call_chunk,
1044 start, length);
1045 if (ret < 0)
1046 return ret;
1047 }
1048
1049 start += length;
1050 length = call_chunk->ch_length - start;
1051 return svc_rdma_read_chunk_range(info, call_chunk, start, length);
1052 }
1053
1054 /**
1055 * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
1056 * @info: context for RDMA Reads
1057 *
1058 * The start of the data lands in the first page just after the
1059 * Transport header, and the rest lands in rqstp->rq_arg.pages.
1060 *
1061 * Assumptions:
1062 * - A PZRC is never sent in an RDMA_MSG message, though it's
1063 * allowed by spec.
1064 *
1065 * Return values:
1066 * %0: RDMA Read WQEs were successfully built
1067 * %-EINVAL: client provided too many chunks or segments,
1068 * %-ENOMEM: rdma_rw context pool was exhausted,
1069 * %-ENOTCONN: posting failed (connection is lost),
1070 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1071 */
svc_rdma_read_special(struct svc_rdma_read_info * info)1072 static noinline int svc_rdma_read_special(struct svc_rdma_read_info *info)
1073 {
1074 struct xdr_buf *buf = &info->ri_rqst->rq_arg;
1075 int ret;
1076
1077 ret = svc_rdma_read_call_chunk(info);
1078 if (ret < 0)
1079 goto out;
1080
1081 buf->len += info->ri_totalbytes;
1082 buf->buflen += info->ri_totalbytes;
1083
1084 buf->head[0].iov_base = page_address(info->ri_rqst->rq_pages[0]);
1085 buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes);
1086 buf->pages = &info->ri_rqst->rq_pages[1];
1087 buf->page_len = info->ri_totalbytes - buf->head[0].iov_len;
1088
1089 out:
1090 return ret;
1091 }
1092
1093 /**
1094 * svc_rdma_process_read_list - Pull list of Read chunks from the client
1095 * @rdma: controlling RDMA transport
1096 * @rqstp: set of pages to use as Read sink buffers
1097 * @head: pages under I/O collect here
1098 *
1099 * The RPC/RDMA protocol assumes that the upper layer's XDR decoders
1100 * pull each Read chunk as they decode an incoming RPC message.
1101 *
1102 * On Linux, however, the server needs to have a fully-constructed RPC
1103 * message in rqstp->rq_arg when there is a positive return code from
1104 * ->xpo_recvfrom. So the Read list is safety-checked immediately when
1105 * it is received, then here the whole Read list is pulled all at once.
1106 * The ingress RPC message is fully reconstructed once all associated
1107 * RDMA Reads have completed.
1108 *
1109 * Return values:
1110 * %1: all needed RDMA Reads were posted successfully,
1111 * %-EINVAL: client provided too many chunks or segments,
1112 * %-ENOMEM: rdma_rw context pool was exhausted,
1113 * %-ENOTCONN: posting failed (connection is lost),
1114 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1115 */
svc_rdma_process_read_list(struct svcxprt_rdma * rdma,struct svc_rqst * rqstp,struct svc_rdma_recv_ctxt * head)1116 int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
1117 struct svc_rqst *rqstp,
1118 struct svc_rdma_recv_ctxt *head)
1119 {
1120 struct svc_rdma_read_info *info;
1121 struct svc_rdma_chunk_ctxt *cc;
1122 int ret;
1123
1124 info = svc_rdma_read_info_alloc(rdma);
1125 if (!info)
1126 return -ENOMEM;
1127 cc = &info->ri_cc;
1128 info->ri_rqst = rqstp;
1129 info->ri_readctxt = head;
1130 info->ri_pageno = 0;
1131 info->ri_pageoff = 0;
1132 info->ri_totalbytes = 0;
1133
1134 if (pcl_is_empty(&head->rc_call_pcl)) {
1135 if (head->rc_read_pcl.cl_count == 1)
1136 ret = svc_rdma_read_data_item(info);
1137 else
1138 ret = svc_rdma_read_multiple_chunks(info);
1139 } else
1140 ret = svc_rdma_read_special(info);
1141 if (ret < 0)
1142 goto out_err;
1143
1144 trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount);
1145 init_completion(&cc->cc_done);
1146 ret = svc_rdma_post_chunk_ctxt(cc);
1147 if (ret < 0)
1148 goto out_err;
1149
1150 ret = 1;
1151 wait_for_completion(&cc->cc_done);
1152 if (cc->cc_status != IB_WC_SUCCESS)
1153 ret = -EIO;
1154
1155 /* rq_respages starts after the last arg page */
1156 rqstp->rq_respages = &rqstp->rq_pages[head->rc_page_count];
1157 rqstp->rq_next_page = rqstp->rq_respages + 1;
1158
1159 /* Ensure svc_rdma_recv_ctxt_put() does not try to release pages */
1160 head->rc_page_count = 0;
1161
1162 out_err:
1163 svc_rdma_read_info_free(info);
1164 return ret;
1165 }
1166