1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * RDMA Transport Layer
4 *
5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
8 */
9
10 #undef pr_fmt
11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
12
13 #include <linux/module.h>
14 #include <linux/mempool.h>
15
16 #include "rtrs-srv.h"
17 #include "rtrs-log.h"
18 #include <rdma/ib_cm.h>
19 #include <rdma/ib_verbs.h>
20
21 MODULE_DESCRIPTION("RDMA Transport Server");
22 MODULE_LICENSE("GPL");
23
24 /* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */
25 #define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
26 #define DEFAULT_SESS_QUEUE_DEPTH 512
27 #define MAX_HDR_SIZE PAGE_SIZE
28
29 /* We guarantee to serve 10 paths at least */
30 #define CHUNK_POOL_SZ 10
31
32 static struct rtrs_rdma_dev_pd dev_pd;
33 static mempool_t *chunk_pool;
34 struct class *rtrs_dev_class;
35 static struct rtrs_srv_ib_ctx ib_ctx;
36
37 static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE;
38 static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH;
39
40 static bool always_invalidate = true;
41 module_param(always_invalidate, bool, 0444);
42 MODULE_PARM_DESC(always_invalidate,
43 "Invalidate memory registration for contiguous memory regions before accessing.");
44
45 module_param_named(max_chunk_size, max_chunk_size, int, 0444);
46 MODULE_PARM_DESC(max_chunk_size,
47 "Max size for each IO request, when change the unit is in byte (default: "
48 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
49
50 module_param_named(sess_queue_depth, sess_queue_depth, int, 0444);
51 MODULE_PARM_DESC(sess_queue_depth,
52 "Number of buffers for pending I/O requests to allocate per session. Maximum: "
53 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
54 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
55
56 static cpumask_t cq_affinity_mask = { CPU_BITS_ALL };
57
58 static struct workqueue_struct *rtrs_wq;
59
to_srv_con(struct rtrs_con * c)60 static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c)
61 {
62 return container_of(c, struct rtrs_srv_con, c);
63 }
64
to_srv_path(struct rtrs_path * s)65 static inline struct rtrs_srv_path *to_srv_path(struct rtrs_path *s)
66 {
67 return container_of(s, struct rtrs_srv_path, s);
68 }
69
rtrs_srv_change_state(struct rtrs_srv_path * srv_path,enum rtrs_srv_state new_state)70 static bool rtrs_srv_change_state(struct rtrs_srv_path *srv_path,
71 enum rtrs_srv_state new_state)
72 {
73 enum rtrs_srv_state old_state;
74 bool changed = false;
75
76 spin_lock_irq(&srv_path->state_lock);
77 old_state = srv_path->state;
78 switch (new_state) {
79 case RTRS_SRV_CONNECTED:
80 if (old_state == RTRS_SRV_CONNECTING)
81 changed = true;
82 break;
83 case RTRS_SRV_CLOSING:
84 if (old_state == RTRS_SRV_CONNECTING ||
85 old_state == RTRS_SRV_CONNECTED)
86 changed = true;
87 break;
88 case RTRS_SRV_CLOSED:
89 if (old_state == RTRS_SRV_CLOSING)
90 changed = true;
91 break;
92 default:
93 break;
94 }
95 if (changed)
96 srv_path->state = new_state;
97 spin_unlock_irq(&srv_path->state_lock);
98
99 return changed;
100 }
101
free_id(struct rtrs_srv_op * id)102 static void free_id(struct rtrs_srv_op *id)
103 {
104 if (!id)
105 return;
106 kfree(id);
107 }
108
rtrs_srv_free_ops_ids(struct rtrs_srv_path * srv_path)109 static void rtrs_srv_free_ops_ids(struct rtrs_srv_path *srv_path)
110 {
111 struct rtrs_srv_sess *srv = srv_path->srv;
112 int i;
113
114 if (srv_path->ops_ids) {
115 for (i = 0; i < srv->queue_depth; i++)
116 free_id(srv_path->ops_ids[i]);
117 kfree(srv_path->ops_ids);
118 srv_path->ops_ids = NULL;
119 }
120 }
121
122 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc);
123
124 static struct ib_cqe io_comp_cqe = {
125 .done = rtrs_srv_rdma_done
126 };
127
rtrs_srv_inflight_ref_release(struct percpu_ref * ref)128 static inline void rtrs_srv_inflight_ref_release(struct percpu_ref *ref)
129 {
130 struct rtrs_srv_path *srv_path = container_of(ref,
131 struct rtrs_srv_path,
132 ids_inflight_ref);
133
134 percpu_ref_exit(&srv_path->ids_inflight_ref);
135 complete(&srv_path->complete_done);
136 }
137
rtrs_srv_alloc_ops_ids(struct rtrs_srv_path * srv_path)138 static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_path *srv_path)
139 {
140 struct rtrs_srv_sess *srv = srv_path->srv;
141 struct rtrs_srv_op *id;
142 int i, ret;
143
144 srv_path->ops_ids = kcalloc(srv->queue_depth,
145 sizeof(*srv_path->ops_ids),
146 GFP_KERNEL);
147 if (!srv_path->ops_ids)
148 goto err;
149
150 for (i = 0; i < srv->queue_depth; ++i) {
151 id = kzalloc(sizeof(*id), GFP_KERNEL);
152 if (!id)
153 goto err;
154
155 srv_path->ops_ids[i] = id;
156 }
157
158 ret = percpu_ref_init(&srv_path->ids_inflight_ref,
159 rtrs_srv_inflight_ref_release, 0, GFP_KERNEL);
160 if (ret) {
161 pr_err("Percpu reference init failed\n");
162 goto err;
163 }
164 init_completion(&srv_path->complete_done);
165
166 return 0;
167
168 err:
169 rtrs_srv_free_ops_ids(srv_path);
170 return -ENOMEM;
171 }
172
rtrs_srv_get_ops_ids(struct rtrs_srv_path * srv_path)173 static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_path *srv_path)
174 {
175 percpu_ref_get(&srv_path->ids_inflight_ref);
176 }
177
rtrs_srv_put_ops_ids(struct rtrs_srv_path * srv_path)178 static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_path *srv_path)
179 {
180 percpu_ref_put(&srv_path->ids_inflight_ref);
181 }
182
rtrs_srv_reg_mr_done(struct ib_cq * cq,struct ib_wc * wc)183 static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc)
184 {
185 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
186 struct rtrs_path *s = con->c.path;
187 struct rtrs_srv_path *srv_path = to_srv_path(s);
188
189 if (wc->status != IB_WC_SUCCESS) {
190 rtrs_err(s, "REG MR failed: %s\n",
191 ib_wc_status_msg(wc->status));
192 close_path(srv_path);
193 return;
194 }
195 }
196
197 static struct ib_cqe local_reg_cqe = {
198 .done = rtrs_srv_reg_mr_done
199 };
200
rdma_write_sg(struct rtrs_srv_op * id)201 static int rdma_write_sg(struct rtrs_srv_op *id)
202 {
203 struct rtrs_path *s = id->con->c.path;
204 struct rtrs_srv_path *srv_path = to_srv_path(s);
205 dma_addr_t dma_addr = srv_path->dma_addr[id->msg_id];
206 struct rtrs_srv_mr *srv_mr;
207 struct ib_send_wr inv_wr;
208 struct ib_rdma_wr imm_wr;
209 struct ib_rdma_wr *wr = NULL;
210 enum ib_send_flags flags;
211 size_t sg_cnt;
212 int err, offset;
213 bool need_inval;
214 u32 rkey = 0;
215 struct ib_reg_wr rwr;
216 struct ib_sge *plist;
217 struct ib_sge list;
218
219 sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt);
220 need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F;
221 if (sg_cnt != 1)
222 return -EINVAL;
223
224 offset = 0;
225
226 wr = &id->tx_wr;
227 plist = &id->tx_sg;
228 plist->addr = dma_addr + offset;
229 plist->length = le32_to_cpu(id->rd_msg->desc[0].len);
230
231 /* WR will fail with length error
232 * if this is 0
233 */
234 if (plist->length == 0) {
235 rtrs_err(s, "Invalid RDMA-Write sg list length 0\n");
236 return -EINVAL;
237 }
238
239 plist->lkey = srv_path->s.dev->ib_pd->local_dma_lkey;
240 offset += plist->length;
241
242 wr->wr.sg_list = plist;
243 wr->wr.num_sge = 1;
244 wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr);
245 wr->rkey = le32_to_cpu(id->rd_msg->desc[0].key);
246 if (rkey == 0)
247 rkey = wr->rkey;
248 else
249 /* Only one key is actually used */
250 WARN_ON_ONCE(rkey != wr->rkey);
251
252 wr->wr.opcode = IB_WR_RDMA_WRITE;
253 wr->wr.wr_cqe = &io_comp_cqe;
254 wr->wr.ex.imm_data = 0;
255 wr->wr.send_flags = 0;
256
257 if (need_inval && always_invalidate) {
258 wr->wr.next = &rwr.wr;
259 rwr.wr.next = &inv_wr;
260 inv_wr.next = &imm_wr.wr;
261 } else if (always_invalidate) {
262 wr->wr.next = &rwr.wr;
263 rwr.wr.next = &imm_wr.wr;
264 } else if (need_inval) {
265 wr->wr.next = &inv_wr;
266 inv_wr.next = &imm_wr.wr;
267 } else {
268 wr->wr.next = &imm_wr.wr;
269 }
270 /*
271 * From time to time we have to post signaled sends,
272 * or send queue will fill up and only QP reset can help.
273 */
274 flags = (atomic_inc_return(&id->con->c.wr_cnt) % s->signal_interval) ?
275 0 : IB_SEND_SIGNALED;
276
277 if (need_inval) {
278 inv_wr.sg_list = NULL;
279 inv_wr.num_sge = 0;
280 inv_wr.opcode = IB_WR_SEND_WITH_INV;
281 inv_wr.wr_cqe = &io_comp_cqe;
282 inv_wr.send_flags = 0;
283 inv_wr.ex.invalidate_rkey = rkey;
284 }
285
286 imm_wr.wr.next = NULL;
287 if (always_invalidate) {
288 struct rtrs_msg_rkey_rsp *msg;
289
290 srv_mr = &srv_path->mrs[id->msg_id];
291 rwr.wr.opcode = IB_WR_REG_MR;
292 rwr.wr.wr_cqe = &local_reg_cqe;
293 rwr.wr.num_sge = 0;
294 rwr.mr = srv_mr->mr;
295 rwr.wr.send_flags = 0;
296 rwr.key = srv_mr->mr->rkey;
297 rwr.access = (IB_ACCESS_LOCAL_WRITE |
298 IB_ACCESS_REMOTE_WRITE);
299 msg = srv_mr->iu->buf;
300 msg->buf_id = cpu_to_le16(id->msg_id);
301 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
302 msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
303
304 list.addr = srv_mr->iu->dma_addr;
305 list.length = sizeof(*msg);
306 list.lkey = srv_path->s.dev->ib_pd->local_dma_lkey;
307 imm_wr.wr.sg_list = &list;
308 imm_wr.wr.num_sge = 1;
309 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM;
310 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev,
311 srv_mr->iu->dma_addr,
312 srv_mr->iu->size, DMA_TO_DEVICE);
313 } else {
314 imm_wr.wr.sg_list = NULL;
315 imm_wr.wr.num_sge = 0;
316 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
317 }
318 imm_wr.wr.send_flags = flags;
319 imm_wr.wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id,
320 0, need_inval));
321
322 imm_wr.wr.wr_cqe = &io_comp_cqe;
323 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, dma_addr,
324 offset, DMA_BIDIRECTIONAL);
325
326 err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL);
327 if (err)
328 rtrs_err(s,
329 "Posting RDMA-Write-Request to QP failed, err: %d\n",
330 err);
331
332 return err;
333 }
334
335 /**
336 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE
337 * requests or on successful WRITE request.
338 * @con: the connection to send back result
339 * @id: the id associated with the IO
340 * @errno: the error number of the IO.
341 *
342 * Return 0 on success, errno otherwise.
343 */
send_io_resp_imm(struct rtrs_srv_con * con,struct rtrs_srv_op * id,int errno)344 static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id,
345 int errno)
346 {
347 struct rtrs_path *s = con->c.path;
348 struct rtrs_srv_path *srv_path = to_srv_path(s);
349 struct ib_send_wr inv_wr, *wr = NULL;
350 struct ib_rdma_wr imm_wr;
351 struct ib_reg_wr rwr;
352 struct rtrs_srv_mr *srv_mr;
353 bool need_inval = false;
354 enum ib_send_flags flags;
355 u32 imm;
356 int err;
357
358 if (id->dir == READ) {
359 struct rtrs_msg_rdma_read *rd_msg = id->rd_msg;
360 size_t sg_cnt;
361
362 need_inval = le16_to_cpu(rd_msg->flags) &
363 RTRS_MSG_NEED_INVAL_F;
364 sg_cnt = le16_to_cpu(rd_msg->sg_cnt);
365
366 if (need_inval) {
367 if (sg_cnt) {
368 inv_wr.wr_cqe = &io_comp_cqe;
369 inv_wr.sg_list = NULL;
370 inv_wr.num_sge = 0;
371 inv_wr.opcode = IB_WR_SEND_WITH_INV;
372 inv_wr.send_flags = 0;
373 /* Only one key is actually used */
374 inv_wr.ex.invalidate_rkey =
375 le32_to_cpu(rd_msg->desc[0].key);
376 } else {
377 WARN_ON_ONCE(1);
378 need_inval = false;
379 }
380 }
381 }
382
383 if (need_inval && always_invalidate) {
384 wr = &inv_wr;
385 inv_wr.next = &rwr.wr;
386 rwr.wr.next = &imm_wr.wr;
387 } else if (always_invalidate) {
388 wr = &rwr.wr;
389 rwr.wr.next = &imm_wr.wr;
390 } else if (need_inval) {
391 wr = &inv_wr;
392 inv_wr.next = &imm_wr.wr;
393 } else {
394 wr = &imm_wr.wr;
395 }
396 /*
397 * From time to time we have to post signalled sends,
398 * or send queue will fill up and only QP reset can help.
399 */
400 flags = (atomic_inc_return(&con->c.wr_cnt) % s->signal_interval) ?
401 0 : IB_SEND_SIGNALED;
402 imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval);
403 imm_wr.wr.next = NULL;
404 if (always_invalidate) {
405 struct ib_sge list;
406 struct rtrs_msg_rkey_rsp *msg;
407
408 srv_mr = &srv_path->mrs[id->msg_id];
409 rwr.wr.next = &imm_wr.wr;
410 rwr.wr.opcode = IB_WR_REG_MR;
411 rwr.wr.wr_cqe = &local_reg_cqe;
412 rwr.wr.num_sge = 0;
413 rwr.wr.send_flags = 0;
414 rwr.mr = srv_mr->mr;
415 rwr.key = srv_mr->mr->rkey;
416 rwr.access = (IB_ACCESS_LOCAL_WRITE |
417 IB_ACCESS_REMOTE_WRITE);
418 msg = srv_mr->iu->buf;
419 msg->buf_id = cpu_to_le16(id->msg_id);
420 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
421 msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
422
423 list.addr = srv_mr->iu->dma_addr;
424 list.length = sizeof(*msg);
425 list.lkey = srv_path->s.dev->ib_pd->local_dma_lkey;
426 imm_wr.wr.sg_list = &list;
427 imm_wr.wr.num_sge = 1;
428 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM;
429 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev,
430 srv_mr->iu->dma_addr,
431 srv_mr->iu->size, DMA_TO_DEVICE);
432 } else {
433 imm_wr.wr.sg_list = NULL;
434 imm_wr.wr.num_sge = 0;
435 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
436 }
437 imm_wr.wr.send_flags = flags;
438 imm_wr.wr.wr_cqe = &io_comp_cqe;
439
440 imm_wr.wr.ex.imm_data = cpu_to_be32(imm);
441
442 err = ib_post_send(id->con->c.qp, wr, NULL);
443 if (err)
444 rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n",
445 err);
446
447 return err;
448 }
449
close_path(struct rtrs_srv_path * srv_path)450 void close_path(struct rtrs_srv_path *srv_path)
451 {
452 if (rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSING))
453 queue_work(rtrs_wq, &srv_path->close_work);
454 WARN_ON(srv_path->state != RTRS_SRV_CLOSING);
455 }
456
rtrs_srv_state_str(enum rtrs_srv_state state)457 static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state)
458 {
459 switch (state) {
460 case RTRS_SRV_CONNECTING:
461 return "RTRS_SRV_CONNECTING";
462 case RTRS_SRV_CONNECTED:
463 return "RTRS_SRV_CONNECTED";
464 case RTRS_SRV_CLOSING:
465 return "RTRS_SRV_CLOSING";
466 case RTRS_SRV_CLOSED:
467 return "RTRS_SRV_CLOSED";
468 default:
469 return "UNKNOWN";
470 }
471 }
472
473 /**
474 * rtrs_srv_resp_rdma() - Finish an RDMA request
475 *
476 * @id: Internal RTRS operation identifier
477 * @status: Response Code sent to the other side for this operation.
478 * 0 = success, <=0 error
479 * Context: any
480 *
481 * Finish a RDMA operation. A message is sent to the client and the
482 * corresponding memory areas will be released.
483 */
rtrs_srv_resp_rdma(struct rtrs_srv_op * id,int status)484 bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status)
485 {
486 struct rtrs_srv_path *srv_path;
487 struct rtrs_srv_con *con;
488 struct rtrs_path *s;
489 int err;
490
491 if (WARN_ON(!id))
492 return true;
493
494 con = id->con;
495 s = con->c.path;
496 srv_path = to_srv_path(s);
497
498 id->status = status;
499
500 if (srv_path->state != RTRS_SRV_CONNECTED) {
501 rtrs_err_rl(s,
502 "Sending I/O response failed, server path %s is disconnected, path state %s\n",
503 kobject_name(&srv_path->kobj),
504 rtrs_srv_state_str(srv_path->state));
505 goto out;
506 }
507 if (always_invalidate) {
508 struct rtrs_srv_mr *mr = &srv_path->mrs[id->msg_id];
509
510 ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey));
511 }
512 if (atomic_sub_return(1, &con->c.sq_wr_avail) < 0) {
513 rtrs_err(s, "IB send queue full: srv_path=%s cid=%d\n",
514 kobject_name(&srv_path->kobj),
515 con->c.cid);
516 atomic_add(1, &con->c.sq_wr_avail);
517 spin_lock(&con->rsp_wr_wait_lock);
518 list_add_tail(&id->wait_list, &con->rsp_wr_wait_list);
519 spin_unlock(&con->rsp_wr_wait_lock);
520 return false;
521 }
522
523 if (status || id->dir == WRITE || !id->rd_msg->sg_cnt)
524 err = send_io_resp_imm(con, id, status);
525 else
526 err = rdma_write_sg(id);
527
528 if (err) {
529 rtrs_err_rl(s, "IO response failed: %d: srv_path=%s\n", err,
530 kobject_name(&srv_path->kobj));
531 close_path(srv_path);
532 }
533 out:
534 rtrs_srv_put_ops_ids(srv_path);
535 return true;
536 }
537 EXPORT_SYMBOL(rtrs_srv_resp_rdma);
538
539 /**
540 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv.
541 * @srv: Session pointer
542 * @priv: The private pointer that is associated with the session.
543 */
rtrs_srv_set_sess_priv(struct rtrs_srv_sess * srv,void * priv)544 void rtrs_srv_set_sess_priv(struct rtrs_srv_sess *srv, void *priv)
545 {
546 srv->priv = priv;
547 }
548 EXPORT_SYMBOL(rtrs_srv_set_sess_priv);
549
unmap_cont_bufs(struct rtrs_srv_path * srv_path)550 static void unmap_cont_bufs(struct rtrs_srv_path *srv_path)
551 {
552 int i;
553
554 for (i = 0; i < srv_path->mrs_num; i++) {
555 struct rtrs_srv_mr *srv_mr;
556
557 srv_mr = &srv_path->mrs[i];
558 rtrs_iu_free(srv_mr->iu, srv_path->s.dev->ib_dev, 1);
559 ib_dereg_mr(srv_mr->mr);
560 ib_dma_unmap_sg(srv_path->s.dev->ib_dev, srv_mr->sgt.sgl,
561 srv_mr->sgt.nents, DMA_BIDIRECTIONAL);
562 sg_free_table(&srv_mr->sgt);
563 }
564 kfree(srv_path->mrs);
565 }
566
map_cont_bufs(struct rtrs_srv_path * srv_path)567 static int map_cont_bufs(struct rtrs_srv_path *srv_path)
568 {
569 struct rtrs_srv_sess *srv = srv_path->srv;
570 struct rtrs_path *ss = &srv_path->s;
571 int i, mri, err, mrs_num;
572 unsigned int chunk_bits;
573 int chunks_per_mr = 1;
574
575 /*
576 * Here we map queue_depth chunks to MR. Firstly we have to
577 * figure out how many chunks can we map per MR.
578 */
579 if (always_invalidate) {
580 /*
581 * in order to do invalidate for each chunks of memory, we needs
582 * more memory regions.
583 */
584 mrs_num = srv->queue_depth;
585 } else {
586 chunks_per_mr =
587 srv_path->s.dev->ib_dev->attrs.max_fast_reg_page_list_len;
588 mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr);
589 chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num);
590 }
591
592 srv_path->mrs = kcalloc(mrs_num, sizeof(*srv_path->mrs), GFP_KERNEL);
593 if (!srv_path->mrs)
594 return -ENOMEM;
595
596 srv_path->mrs_num = mrs_num;
597
598 for (mri = 0; mri < mrs_num; mri++) {
599 struct rtrs_srv_mr *srv_mr = &srv_path->mrs[mri];
600 struct sg_table *sgt = &srv_mr->sgt;
601 struct scatterlist *s;
602 struct ib_mr *mr;
603 int nr, nr_sgt, chunks;
604
605 chunks = chunks_per_mr * mri;
606 if (!always_invalidate)
607 chunks_per_mr = min_t(int, chunks_per_mr,
608 srv->queue_depth - chunks);
609
610 err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL);
611 if (err)
612 goto err;
613
614 for_each_sg(sgt->sgl, s, chunks_per_mr, i)
615 sg_set_page(s, srv->chunks[chunks + i],
616 max_chunk_size, 0);
617
618 nr_sgt = ib_dma_map_sg(srv_path->s.dev->ib_dev, sgt->sgl,
619 sgt->nents, DMA_BIDIRECTIONAL);
620 if (!nr_sgt) {
621 err = -EINVAL;
622 goto free_sg;
623 }
624 mr = ib_alloc_mr(srv_path->s.dev->ib_pd, IB_MR_TYPE_MEM_REG,
625 nr_sgt);
626 if (IS_ERR(mr)) {
627 err = PTR_ERR(mr);
628 goto unmap_sg;
629 }
630 nr = ib_map_mr_sg(mr, sgt->sgl, nr_sgt,
631 NULL, max_chunk_size);
632 if (nr < 0 || nr < sgt->nents) {
633 err = nr < 0 ? nr : -EINVAL;
634 goto dereg_mr;
635 }
636
637 if (always_invalidate) {
638 srv_mr->iu = rtrs_iu_alloc(1,
639 sizeof(struct rtrs_msg_rkey_rsp),
640 GFP_KERNEL, srv_path->s.dev->ib_dev,
641 DMA_TO_DEVICE, rtrs_srv_rdma_done);
642 if (!srv_mr->iu) {
643 err = -ENOMEM;
644 rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err);
645 goto dereg_mr;
646 }
647 }
648 /* Eventually dma addr for each chunk can be cached */
649 for_each_sg(sgt->sgl, s, nr_sgt, i)
650 srv_path->dma_addr[chunks + i] = sg_dma_address(s);
651
652 ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey));
653 srv_mr->mr = mr;
654
655 continue;
656 err:
657 while (mri--) {
658 srv_mr = &srv_path->mrs[mri];
659 sgt = &srv_mr->sgt;
660 mr = srv_mr->mr;
661 rtrs_iu_free(srv_mr->iu, srv_path->s.dev->ib_dev, 1);
662 dereg_mr:
663 ib_dereg_mr(mr);
664 unmap_sg:
665 ib_dma_unmap_sg(srv_path->s.dev->ib_dev, sgt->sgl,
666 sgt->nents, DMA_BIDIRECTIONAL);
667 free_sg:
668 sg_free_table(sgt);
669 }
670 kfree(srv_path->mrs);
671
672 return err;
673 }
674
675 chunk_bits = ilog2(srv->queue_depth - 1) + 1;
676 srv_path->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits);
677
678 return 0;
679 }
680
rtrs_srv_hb_err_handler(struct rtrs_con * c)681 static void rtrs_srv_hb_err_handler(struct rtrs_con *c)
682 {
683 close_path(to_srv_path(c->path));
684 }
685
rtrs_srv_init_hb(struct rtrs_srv_path * srv_path)686 static void rtrs_srv_init_hb(struct rtrs_srv_path *srv_path)
687 {
688 rtrs_init_hb(&srv_path->s, &io_comp_cqe,
689 RTRS_HB_INTERVAL_MS,
690 RTRS_HB_MISSED_MAX,
691 rtrs_srv_hb_err_handler,
692 rtrs_wq);
693 }
694
rtrs_srv_start_hb(struct rtrs_srv_path * srv_path)695 static void rtrs_srv_start_hb(struct rtrs_srv_path *srv_path)
696 {
697 rtrs_start_hb(&srv_path->s);
698 }
699
rtrs_srv_stop_hb(struct rtrs_srv_path * srv_path)700 static void rtrs_srv_stop_hb(struct rtrs_srv_path *srv_path)
701 {
702 rtrs_stop_hb(&srv_path->s);
703 }
704
rtrs_srv_info_rsp_done(struct ib_cq * cq,struct ib_wc * wc)705 static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc)
706 {
707 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
708 struct rtrs_path *s = con->c.path;
709 struct rtrs_srv_path *srv_path = to_srv_path(s);
710 struct rtrs_iu *iu;
711
712 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
713 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1);
714
715 if (wc->status != IB_WC_SUCCESS) {
716 rtrs_err(s, "Sess info response send failed: %s\n",
717 ib_wc_status_msg(wc->status));
718 close_path(srv_path);
719 return;
720 }
721 WARN_ON(wc->opcode != IB_WC_SEND);
722 }
723
rtrs_srv_path_up(struct rtrs_srv_path * srv_path)724 static void rtrs_srv_path_up(struct rtrs_srv_path *srv_path)
725 {
726 struct rtrs_srv_sess *srv = srv_path->srv;
727 struct rtrs_srv_ctx *ctx = srv->ctx;
728 int up;
729
730 mutex_lock(&srv->paths_ev_mutex);
731 up = ++srv->paths_up;
732 if (up == 1)
733 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL);
734 mutex_unlock(&srv->paths_ev_mutex);
735
736 /* Mark session as established */
737 srv_path->established = true;
738 }
739
rtrs_srv_path_down(struct rtrs_srv_path * srv_path)740 static void rtrs_srv_path_down(struct rtrs_srv_path *srv_path)
741 {
742 struct rtrs_srv_sess *srv = srv_path->srv;
743 struct rtrs_srv_ctx *ctx = srv->ctx;
744
745 if (!srv_path->established)
746 return;
747
748 srv_path->established = false;
749 mutex_lock(&srv->paths_ev_mutex);
750 WARN_ON(!srv->paths_up);
751 if (--srv->paths_up == 0)
752 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv);
753 mutex_unlock(&srv->paths_ev_mutex);
754 }
755
exist_pathname(struct rtrs_srv_ctx * ctx,const char * pathname,const uuid_t * path_uuid)756 static bool exist_pathname(struct rtrs_srv_ctx *ctx,
757 const char *pathname, const uuid_t *path_uuid)
758 {
759 struct rtrs_srv_sess *srv;
760 struct rtrs_srv_path *srv_path;
761 bool found = false;
762
763 mutex_lock(&ctx->srv_mutex);
764 list_for_each_entry(srv, &ctx->srv_list, ctx_list) {
765 mutex_lock(&srv->paths_mutex);
766
767 /* when a client with same uuid and same sessname tried to add a path */
768 if (uuid_equal(&srv->paths_uuid, path_uuid)) {
769 mutex_unlock(&srv->paths_mutex);
770 continue;
771 }
772
773 list_for_each_entry(srv_path, &srv->paths_list, s.entry) {
774 if (strlen(srv_path->s.sessname) == strlen(pathname) &&
775 !strcmp(srv_path->s.sessname, pathname)) {
776 found = true;
777 break;
778 }
779 }
780 mutex_unlock(&srv->paths_mutex);
781 if (found)
782 break;
783 }
784 mutex_unlock(&ctx->srv_mutex);
785 return found;
786 }
787
788 static int post_recv_path(struct rtrs_srv_path *srv_path);
789 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno);
790
process_info_req(struct rtrs_srv_con * con,struct rtrs_msg_info_req * msg)791 static int process_info_req(struct rtrs_srv_con *con,
792 struct rtrs_msg_info_req *msg)
793 {
794 struct rtrs_path *s = con->c.path;
795 struct rtrs_srv_path *srv_path = to_srv_path(s);
796 struct ib_send_wr *reg_wr = NULL;
797 struct rtrs_msg_info_rsp *rsp;
798 struct rtrs_iu *tx_iu;
799 struct ib_reg_wr *rwr;
800 int mri, err;
801 size_t tx_sz;
802
803 err = post_recv_path(srv_path);
804 if (err) {
805 rtrs_err(s, "post_recv_path(), err: %d\n", err);
806 return err;
807 }
808
809 if (strchr(msg->pathname, '/') || strchr(msg->pathname, '.')) {
810 rtrs_err(s, "pathname cannot contain / and .\n");
811 return -EINVAL;
812 }
813
814 if (exist_pathname(srv_path->srv->ctx,
815 msg->pathname, &srv_path->srv->paths_uuid)) {
816 rtrs_err(s, "pathname is duplicated: %s\n", msg->pathname);
817 return -EPERM;
818 }
819 strscpy(srv_path->s.sessname, msg->pathname,
820 sizeof(srv_path->s.sessname));
821
822 rwr = kcalloc(srv_path->mrs_num, sizeof(*rwr), GFP_KERNEL);
823 if (!rwr)
824 return -ENOMEM;
825
826 tx_sz = sizeof(*rsp);
827 tx_sz += sizeof(rsp->desc[0]) * srv_path->mrs_num;
828 tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, srv_path->s.dev->ib_dev,
829 DMA_TO_DEVICE, rtrs_srv_info_rsp_done);
830 if (!tx_iu) {
831 err = -ENOMEM;
832 goto rwr_free;
833 }
834
835 rsp = tx_iu->buf;
836 rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP);
837 rsp->sg_cnt = cpu_to_le16(srv_path->mrs_num);
838
839 for (mri = 0; mri < srv_path->mrs_num; mri++) {
840 struct ib_mr *mr = srv_path->mrs[mri].mr;
841
842 rsp->desc[mri].addr = cpu_to_le64(mr->iova);
843 rsp->desc[mri].key = cpu_to_le32(mr->rkey);
844 rsp->desc[mri].len = cpu_to_le32(mr->length);
845
846 /*
847 * Fill in reg MR request and chain them *backwards*
848 */
849 rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL;
850 rwr[mri].wr.opcode = IB_WR_REG_MR;
851 rwr[mri].wr.wr_cqe = &local_reg_cqe;
852 rwr[mri].wr.num_sge = 0;
853 rwr[mri].wr.send_flags = 0;
854 rwr[mri].mr = mr;
855 rwr[mri].key = mr->rkey;
856 rwr[mri].access = (IB_ACCESS_LOCAL_WRITE |
857 IB_ACCESS_REMOTE_WRITE);
858 reg_wr = &rwr[mri].wr;
859 }
860
861 err = rtrs_srv_create_path_files(srv_path);
862 if (err)
863 goto iu_free;
864 kobject_get(&srv_path->kobj);
865 get_device(&srv_path->srv->dev);
866 rtrs_srv_change_state(srv_path, RTRS_SRV_CONNECTED);
867 rtrs_srv_start_hb(srv_path);
868
869 /*
870 * We do not account number of established connections at the current
871 * moment, we rely on the client, which should send info request when
872 * all connections are successfully established. Thus, simply notify
873 * listener with a proper event if we are the first path.
874 */
875 rtrs_srv_path_up(srv_path);
876
877 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev,
878 tx_iu->dma_addr,
879 tx_iu->size, DMA_TO_DEVICE);
880
881 /* Send info response */
882 err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr);
883 if (err) {
884 rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err);
885 iu_free:
886 rtrs_iu_free(tx_iu, srv_path->s.dev->ib_dev, 1);
887 }
888 rwr_free:
889 kfree(rwr);
890
891 return err;
892 }
893
rtrs_srv_info_req_done(struct ib_cq * cq,struct ib_wc * wc)894 static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc)
895 {
896 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
897 struct rtrs_path *s = con->c.path;
898 struct rtrs_srv_path *srv_path = to_srv_path(s);
899 struct rtrs_msg_info_req *msg;
900 struct rtrs_iu *iu;
901 int err;
902
903 WARN_ON(con->c.cid);
904
905 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
906 if (wc->status != IB_WC_SUCCESS) {
907 rtrs_err(s, "Sess info request receive failed: %s\n",
908 ib_wc_status_msg(wc->status));
909 goto close;
910 }
911 WARN_ON(wc->opcode != IB_WC_RECV);
912
913 if (wc->byte_len < sizeof(*msg)) {
914 rtrs_err(s, "Sess info request is malformed: size %d\n",
915 wc->byte_len);
916 goto close;
917 }
918 ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev, iu->dma_addr,
919 iu->size, DMA_FROM_DEVICE);
920 msg = iu->buf;
921 if (le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ) {
922 rtrs_err(s, "Sess info request is malformed: type %d\n",
923 le16_to_cpu(msg->type));
924 goto close;
925 }
926 err = process_info_req(con, msg);
927 if (err)
928 goto close;
929
930 out:
931 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1);
932 return;
933 close:
934 close_path(srv_path);
935 goto out;
936 }
937
post_recv_info_req(struct rtrs_srv_con * con)938 static int post_recv_info_req(struct rtrs_srv_con *con)
939 {
940 struct rtrs_path *s = con->c.path;
941 struct rtrs_srv_path *srv_path = to_srv_path(s);
942 struct rtrs_iu *rx_iu;
943 int err;
944
945 rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req),
946 GFP_KERNEL, srv_path->s.dev->ib_dev,
947 DMA_FROM_DEVICE, rtrs_srv_info_req_done);
948 if (!rx_iu)
949 return -ENOMEM;
950 /* Prepare for getting info response */
951 err = rtrs_iu_post_recv(&con->c, rx_iu);
952 if (err) {
953 rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err);
954 rtrs_iu_free(rx_iu, srv_path->s.dev->ib_dev, 1);
955 return err;
956 }
957
958 return 0;
959 }
960
post_recv_io(struct rtrs_srv_con * con,size_t q_size)961 static int post_recv_io(struct rtrs_srv_con *con, size_t q_size)
962 {
963 int i, err;
964
965 for (i = 0; i < q_size; i++) {
966 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
967 if (err)
968 return err;
969 }
970
971 return 0;
972 }
973
post_recv_path(struct rtrs_srv_path * srv_path)974 static int post_recv_path(struct rtrs_srv_path *srv_path)
975 {
976 struct rtrs_srv_sess *srv = srv_path->srv;
977 struct rtrs_path *s = &srv_path->s;
978 size_t q_size;
979 int err, cid;
980
981 for (cid = 0; cid < srv_path->s.con_num; cid++) {
982 if (cid == 0)
983 q_size = SERVICE_CON_QUEUE_DEPTH;
984 else
985 q_size = srv->queue_depth;
986
987 err = post_recv_io(to_srv_con(srv_path->s.con[cid]), q_size);
988 if (err) {
989 rtrs_err(s, "post_recv_io(), err: %d\n", err);
990 return err;
991 }
992 }
993
994 return 0;
995 }
996
process_read(struct rtrs_srv_con * con,struct rtrs_msg_rdma_read * msg,u32 buf_id,u32 off)997 static void process_read(struct rtrs_srv_con *con,
998 struct rtrs_msg_rdma_read *msg,
999 u32 buf_id, u32 off)
1000 {
1001 struct rtrs_path *s = con->c.path;
1002 struct rtrs_srv_path *srv_path = to_srv_path(s);
1003 struct rtrs_srv_sess *srv = srv_path->srv;
1004 struct rtrs_srv_ctx *ctx = srv->ctx;
1005 struct rtrs_srv_op *id;
1006
1007 size_t usr_len, data_len;
1008 void *data;
1009 int ret;
1010
1011 if (srv_path->state != RTRS_SRV_CONNECTED) {
1012 rtrs_err_rl(s,
1013 "Processing read request failed, session is disconnected, sess state %s\n",
1014 rtrs_srv_state_str(srv_path->state));
1015 return;
1016 }
1017 if (msg->sg_cnt != 1 && msg->sg_cnt != 0) {
1018 rtrs_err_rl(s,
1019 "Processing read request failed, invalid message\n");
1020 return;
1021 }
1022 rtrs_srv_get_ops_ids(srv_path);
1023 rtrs_srv_update_rdma_stats(srv_path->stats, off, READ);
1024 id = srv_path->ops_ids[buf_id];
1025 id->con = con;
1026 id->dir = READ;
1027 id->msg_id = buf_id;
1028 id->rd_msg = msg;
1029 usr_len = le16_to_cpu(msg->usr_len);
1030 data_len = off - usr_len;
1031 data = page_address(srv->chunks[buf_id]);
1032 ret = ctx->ops.rdma_ev(srv->priv, id, READ, data, data_len,
1033 data + data_len, usr_len);
1034
1035 if (ret) {
1036 rtrs_err_rl(s,
1037 "Processing read request failed, user module cb reported for msg_id %d, err: %d\n",
1038 buf_id, ret);
1039 goto send_err_msg;
1040 }
1041
1042 return;
1043
1044 send_err_msg:
1045 ret = send_io_resp_imm(con, id, ret);
1046 if (ret < 0) {
1047 rtrs_err_rl(s,
1048 "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n",
1049 buf_id, ret);
1050 close_path(srv_path);
1051 }
1052 rtrs_srv_put_ops_ids(srv_path);
1053 }
1054
process_write(struct rtrs_srv_con * con,struct rtrs_msg_rdma_write * req,u32 buf_id,u32 off)1055 static void process_write(struct rtrs_srv_con *con,
1056 struct rtrs_msg_rdma_write *req,
1057 u32 buf_id, u32 off)
1058 {
1059 struct rtrs_path *s = con->c.path;
1060 struct rtrs_srv_path *srv_path = to_srv_path(s);
1061 struct rtrs_srv_sess *srv = srv_path->srv;
1062 struct rtrs_srv_ctx *ctx = srv->ctx;
1063 struct rtrs_srv_op *id;
1064
1065 size_t data_len, usr_len;
1066 void *data;
1067 int ret;
1068
1069 if (srv_path->state != RTRS_SRV_CONNECTED) {
1070 rtrs_err_rl(s,
1071 "Processing write request failed, session is disconnected, sess state %s\n",
1072 rtrs_srv_state_str(srv_path->state));
1073 return;
1074 }
1075 rtrs_srv_get_ops_ids(srv_path);
1076 rtrs_srv_update_rdma_stats(srv_path->stats, off, WRITE);
1077 id = srv_path->ops_ids[buf_id];
1078 id->con = con;
1079 id->dir = WRITE;
1080 id->msg_id = buf_id;
1081
1082 usr_len = le16_to_cpu(req->usr_len);
1083 data_len = off - usr_len;
1084 data = page_address(srv->chunks[buf_id]);
1085 ret = ctx->ops.rdma_ev(srv->priv, id, WRITE, data, data_len,
1086 data + data_len, usr_len);
1087 if (ret) {
1088 rtrs_err_rl(s,
1089 "Processing write request failed, user module callback reports err: %d\n",
1090 ret);
1091 goto send_err_msg;
1092 }
1093
1094 return;
1095
1096 send_err_msg:
1097 ret = send_io_resp_imm(con, id, ret);
1098 if (ret < 0) {
1099 rtrs_err_rl(s,
1100 "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n",
1101 buf_id, ret);
1102 close_path(srv_path);
1103 }
1104 rtrs_srv_put_ops_ids(srv_path);
1105 }
1106
process_io_req(struct rtrs_srv_con * con,void * msg,u32 id,u32 off)1107 static void process_io_req(struct rtrs_srv_con *con, void *msg,
1108 u32 id, u32 off)
1109 {
1110 struct rtrs_path *s = con->c.path;
1111 struct rtrs_srv_path *srv_path = to_srv_path(s);
1112 struct rtrs_msg_rdma_hdr *hdr;
1113 unsigned int type;
1114
1115 ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev,
1116 srv_path->dma_addr[id],
1117 max_chunk_size, DMA_BIDIRECTIONAL);
1118 hdr = msg;
1119 type = le16_to_cpu(hdr->type);
1120
1121 switch (type) {
1122 case RTRS_MSG_WRITE:
1123 process_write(con, msg, id, off);
1124 break;
1125 case RTRS_MSG_READ:
1126 process_read(con, msg, id, off);
1127 break;
1128 default:
1129 rtrs_err(s,
1130 "Processing I/O request failed, unknown message type received: 0x%02x\n",
1131 type);
1132 goto err;
1133 }
1134
1135 return;
1136
1137 err:
1138 close_path(srv_path);
1139 }
1140
rtrs_srv_inv_rkey_done(struct ib_cq * cq,struct ib_wc * wc)1141 static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc)
1142 {
1143 struct rtrs_srv_mr *mr =
1144 container_of(wc->wr_cqe, typeof(*mr), inv_cqe);
1145 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
1146 struct rtrs_path *s = con->c.path;
1147 struct rtrs_srv_path *srv_path = to_srv_path(s);
1148 struct rtrs_srv_sess *srv = srv_path->srv;
1149 u32 msg_id, off;
1150 void *data;
1151
1152 if (wc->status != IB_WC_SUCCESS) {
1153 rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n",
1154 ib_wc_status_msg(wc->status));
1155 close_path(srv_path);
1156 }
1157 msg_id = mr->msg_id;
1158 off = mr->msg_off;
1159 data = page_address(srv->chunks[msg_id]) + off;
1160 process_io_req(con, data, msg_id, off);
1161 }
1162
rtrs_srv_inv_rkey(struct rtrs_srv_con * con,struct rtrs_srv_mr * mr)1163 static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con,
1164 struct rtrs_srv_mr *mr)
1165 {
1166 struct ib_send_wr wr = {
1167 .opcode = IB_WR_LOCAL_INV,
1168 .wr_cqe = &mr->inv_cqe,
1169 .send_flags = IB_SEND_SIGNALED,
1170 .ex.invalidate_rkey = mr->mr->rkey,
1171 };
1172 mr->inv_cqe.done = rtrs_srv_inv_rkey_done;
1173
1174 return ib_post_send(con->c.qp, &wr, NULL);
1175 }
1176
rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con * con)1177 static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con)
1178 {
1179 spin_lock(&con->rsp_wr_wait_lock);
1180 while (!list_empty(&con->rsp_wr_wait_list)) {
1181 struct rtrs_srv_op *id;
1182 int ret;
1183
1184 id = list_entry(con->rsp_wr_wait_list.next,
1185 struct rtrs_srv_op, wait_list);
1186 list_del(&id->wait_list);
1187
1188 spin_unlock(&con->rsp_wr_wait_lock);
1189 ret = rtrs_srv_resp_rdma(id, id->status);
1190 spin_lock(&con->rsp_wr_wait_lock);
1191
1192 if (!ret) {
1193 list_add(&id->wait_list, &con->rsp_wr_wait_list);
1194 break;
1195 }
1196 }
1197 spin_unlock(&con->rsp_wr_wait_lock);
1198 }
1199
rtrs_srv_rdma_done(struct ib_cq * cq,struct ib_wc * wc)1200 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc)
1201 {
1202 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
1203 struct rtrs_path *s = con->c.path;
1204 struct rtrs_srv_path *srv_path = to_srv_path(s);
1205 struct rtrs_srv_sess *srv = srv_path->srv;
1206 u32 imm_type, imm_payload;
1207 int err;
1208
1209 if (wc->status != IB_WC_SUCCESS) {
1210 if (wc->status != IB_WC_WR_FLUSH_ERR) {
1211 rtrs_err(s,
1212 "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n",
1213 ib_wc_status_msg(wc->status), wc->wr_cqe,
1214 wc->opcode, wc->vendor_err, wc->byte_len);
1215 close_path(srv_path);
1216 }
1217 return;
1218 }
1219
1220 switch (wc->opcode) {
1221 case IB_WC_RECV_RDMA_WITH_IMM:
1222 /*
1223 * post_recv() RDMA write completions of IO reqs (read/write)
1224 * and hb
1225 */
1226 if (WARN_ON(wc->wr_cqe != &io_comp_cqe))
1227 return;
1228 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
1229 if (err) {
1230 rtrs_err(s, "rtrs_post_recv(), err: %d\n", err);
1231 close_path(srv_path);
1232 break;
1233 }
1234 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data),
1235 &imm_type, &imm_payload);
1236 if (imm_type == RTRS_IO_REQ_IMM) {
1237 u32 msg_id, off;
1238 void *data;
1239
1240 msg_id = imm_payload >> srv_path->mem_bits;
1241 off = imm_payload & ((1 << srv_path->mem_bits) - 1);
1242 if (msg_id >= srv->queue_depth || off >= max_chunk_size) {
1243 rtrs_err(s, "Wrong msg_id %u, off %u\n",
1244 msg_id, off);
1245 close_path(srv_path);
1246 return;
1247 }
1248 if (always_invalidate) {
1249 struct rtrs_srv_mr *mr = &srv_path->mrs[msg_id];
1250
1251 mr->msg_off = off;
1252 mr->msg_id = msg_id;
1253 err = rtrs_srv_inv_rkey(con, mr);
1254 if (err) {
1255 rtrs_err(s, "rtrs_post_recv(), err: %d\n",
1256 err);
1257 close_path(srv_path);
1258 break;
1259 }
1260 } else {
1261 data = page_address(srv->chunks[msg_id]) + off;
1262 process_io_req(con, data, msg_id, off);
1263 }
1264 } else if (imm_type == RTRS_HB_MSG_IMM) {
1265 WARN_ON(con->c.cid);
1266 rtrs_send_hb_ack(&srv_path->s);
1267 } else if (imm_type == RTRS_HB_ACK_IMM) {
1268 WARN_ON(con->c.cid);
1269 srv_path->s.hb_missed_cnt = 0;
1270 } else {
1271 rtrs_wrn(s, "Unknown IMM type %u\n", imm_type);
1272 }
1273 break;
1274 case IB_WC_RDMA_WRITE:
1275 case IB_WC_SEND:
1276 /*
1277 * post_send() RDMA write completions of IO reqs (read/write)
1278 * and hb.
1279 */
1280 atomic_add(s->signal_interval, &con->c.sq_wr_avail);
1281
1282 if (!list_empty_careful(&con->rsp_wr_wait_list))
1283 rtrs_rdma_process_wr_wait_list(con);
1284
1285 break;
1286 default:
1287 rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode);
1288 return;
1289 }
1290 }
1291
1292 /**
1293 * rtrs_srv_get_path_name() - Get rtrs_srv peer hostname.
1294 * @srv: Session
1295 * @pathname: Pathname buffer
1296 * @len: Length of sessname buffer
1297 */
rtrs_srv_get_path_name(struct rtrs_srv_sess * srv,char * pathname,size_t len)1298 int rtrs_srv_get_path_name(struct rtrs_srv_sess *srv, char *pathname,
1299 size_t len)
1300 {
1301 struct rtrs_srv_path *srv_path;
1302 int err = -ENOTCONN;
1303
1304 mutex_lock(&srv->paths_mutex);
1305 list_for_each_entry(srv_path, &srv->paths_list, s.entry) {
1306 if (srv_path->state != RTRS_SRV_CONNECTED)
1307 continue;
1308 strscpy(pathname, srv_path->s.sessname,
1309 min_t(size_t, sizeof(srv_path->s.sessname), len));
1310 err = 0;
1311 break;
1312 }
1313 mutex_unlock(&srv->paths_mutex);
1314
1315 return err;
1316 }
1317 EXPORT_SYMBOL(rtrs_srv_get_path_name);
1318
1319 /**
1320 * rtrs_srv_get_queue_depth() - Get rtrs_srv qdepth.
1321 * @srv: Session
1322 */
rtrs_srv_get_queue_depth(struct rtrs_srv_sess * srv)1323 int rtrs_srv_get_queue_depth(struct rtrs_srv_sess *srv)
1324 {
1325 return srv->queue_depth;
1326 }
1327 EXPORT_SYMBOL(rtrs_srv_get_queue_depth);
1328
find_next_bit_ring(struct rtrs_srv_path * srv_path)1329 static int find_next_bit_ring(struct rtrs_srv_path *srv_path)
1330 {
1331 struct ib_device *ib_dev = srv_path->s.dev->ib_dev;
1332 int v;
1333
1334 v = cpumask_next(srv_path->cur_cq_vector, &cq_affinity_mask);
1335 if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors)
1336 v = cpumask_first(&cq_affinity_mask);
1337 return v;
1338 }
1339
rtrs_srv_get_next_cq_vector(struct rtrs_srv_path * srv_path)1340 static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_path *srv_path)
1341 {
1342 srv_path->cur_cq_vector = find_next_bit_ring(srv_path);
1343
1344 return srv_path->cur_cq_vector;
1345 }
1346
rtrs_srv_dev_release(struct device * dev)1347 static void rtrs_srv_dev_release(struct device *dev)
1348 {
1349 struct rtrs_srv_sess *srv = container_of(dev, struct rtrs_srv_sess,
1350 dev);
1351
1352 kfree(srv);
1353 }
1354
free_srv(struct rtrs_srv_sess * srv)1355 static void free_srv(struct rtrs_srv_sess *srv)
1356 {
1357 int i;
1358
1359 WARN_ON(refcount_read(&srv->refcount));
1360 for (i = 0; i < srv->queue_depth; i++)
1361 mempool_free(srv->chunks[i], chunk_pool);
1362 kfree(srv->chunks);
1363 mutex_destroy(&srv->paths_mutex);
1364 mutex_destroy(&srv->paths_ev_mutex);
1365 /* last put to release the srv structure */
1366 put_device(&srv->dev);
1367 }
1368
get_or_create_srv(struct rtrs_srv_ctx * ctx,const uuid_t * paths_uuid,bool first_conn)1369 static struct rtrs_srv_sess *get_or_create_srv(struct rtrs_srv_ctx *ctx,
1370 const uuid_t *paths_uuid,
1371 bool first_conn)
1372 {
1373 struct rtrs_srv_sess *srv;
1374 int i;
1375
1376 mutex_lock(&ctx->srv_mutex);
1377 list_for_each_entry(srv, &ctx->srv_list, ctx_list) {
1378 if (uuid_equal(&srv->paths_uuid, paths_uuid) &&
1379 refcount_inc_not_zero(&srv->refcount)) {
1380 mutex_unlock(&ctx->srv_mutex);
1381 return srv;
1382 }
1383 }
1384 mutex_unlock(&ctx->srv_mutex);
1385 /*
1386 * If this request is not the first connection request from the
1387 * client for this session then fail and return error.
1388 */
1389 if (!first_conn) {
1390 pr_err_ratelimited("Error: Not the first connection request for this session\n");
1391 return ERR_PTR(-ENXIO);
1392 }
1393
1394 /* need to allocate a new srv */
1395 srv = kzalloc(sizeof(*srv), GFP_KERNEL);
1396 if (!srv)
1397 return ERR_PTR(-ENOMEM);
1398
1399 INIT_LIST_HEAD(&srv->paths_list);
1400 mutex_init(&srv->paths_mutex);
1401 mutex_init(&srv->paths_ev_mutex);
1402 uuid_copy(&srv->paths_uuid, paths_uuid);
1403 srv->queue_depth = sess_queue_depth;
1404 srv->ctx = ctx;
1405 device_initialize(&srv->dev);
1406 srv->dev.release = rtrs_srv_dev_release;
1407
1408 srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks),
1409 GFP_KERNEL);
1410 if (!srv->chunks)
1411 goto err_free_srv;
1412
1413 for (i = 0; i < srv->queue_depth; i++) {
1414 srv->chunks[i] = mempool_alloc(chunk_pool, GFP_KERNEL);
1415 if (!srv->chunks[i])
1416 goto err_free_chunks;
1417 }
1418 refcount_set(&srv->refcount, 1);
1419 mutex_lock(&ctx->srv_mutex);
1420 list_add(&srv->ctx_list, &ctx->srv_list);
1421 mutex_unlock(&ctx->srv_mutex);
1422
1423 return srv;
1424
1425 err_free_chunks:
1426 while (i--)
1427 mempool_free(srv->chunks[i], chunk_pool);
1428 kfree(srv->chunks);
1429
1430 err_free_srv:
1431 kfree(srv);
1432 return ERR_PTR(-ENOMEM);
1433 }
1434
put_srv(struct rtrs_srv_sess * srv)1435 static void put_srv(struct rtrs_srv_sess *srv)
1436 {
1437 if (refcount_dec_and_test(&srv->refcount)) {
1438 struct rtrs_srv_ctx *ctx = srv->ctx;
1439
1440 WARN_ON(srv->dev.kobj.state_in_sysfs);
1441
1442 mutex_lock(&ctx->srv_mutex);
1443 list_del(&srv->ctx_list);
1444 mutex_unlock(&ctx->srv_mutex);
1445 free_srv(srv);
1446 }
1447 }
1448
__add_path_to_srv(struct rtrs_srv_sess * srv,struct rtrs_srv_path * srv_path)1449 static void __add_path_to_srv(struct rtrs_srv_sess *srv,
1450 struct rtrs_srv_path *srv_path)
1451 {
1452 list_add_tail(&srv_path->s.entry, &srv->paths_list);
1453 srv->paths_num++;
1454 WARN_ON(srv->paths_num >= MAX_PATHS_NUM);
1455 }
1456
del_path_from_srv(struct rtrs_srv_path * srv_path)1457 static void del_path_from_srv(struct rtrs_srv_path *srv_path)
1458 {
1459 struct rtrs_srv_sess *srv = srv_path->srv;
1460
1461 if (WARN_ON(!srv))
1462 return;
1463
1464 mutex_lock(&srv->paths_mutex);
1465 list_del(&srv_path->s.entry);
1466 WARN_ON(!srv->paths_num);
1467 srv->paths_num--;
1468 mutex_unlock(&srv->paths_mutex);
1469 }
1470
1471 /* return true if addresses are the same, error other wise */
sockaddr_cmp(const struct sockaddr * a,const struct sockaddr * b)1472 static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b)
1473 {
1474 switch (a->sa_family) {
1475 case AF_IB:
1476 return memcmp(&((struct sockaddr_ib *)a)->sib_addr,
1477 &((struct sockaddr_ib *)b)->sib_addr,
1478 sizeof(struct ib_addr)) &&
1479 (b->sa_family == AF_IB);
1480 case AF_INET:
1481 return memcmp(&((struct sockaddr_in *)a)->sin_addr,
1482 &((struct sockaddr_in *)b)->sin_addr,
1483 sizeof(struct in_addr)) &&
1484 (b->sa_family == AF_INET);
1485 case AF_INET6:
1486 return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr,
1487 &((struct sockaddr_in6 *)b)->sin6_addr,
1488 sizeof(struct in6_addr)) &&
1489 (b->sa_family == AF_INET6);
1490 default:
1491 return -ENOENT;
1492 }
1493 }
1494
__is_path_w_addr_exists(struct rtrs_srv_sess * srv,struct rdma_addr * addr)1495 static bool __is_path_w_addr_exists(struct rtrs_srv_sess *srv,
1496 struct rdma_addr *addr)
1497 {
1498 struct rtrs_srv_path *srv_path;
1499
1500 list_for_each_entry(srv_path, &srv->paths_list, s.entry)
1501 if (!sockaddr_cmp((struct sockaddr *)&srv_path->s.dst_addr,
1502 (struct sockaddr *)&addr->dst_addr) &&
1503 !sockaddr_cmp((struct sockaddr *)&srv_path->s.src_addr,
1504 (struct sockaddr *)&addr->src_addr))
1505 return true;
1506
1507 return false;
1508 }
1509
free_path(struct rtrs_srv_path * srv_path)1510 static void free_path(struct rtrs_srv_path *srv_path)
1511 {
1512 if (srv_path->kobj.state_in_sysfs) {
1513 kobject_del(&srv_path->kobj);
1514 kobject_put(&srv_path->kobj);
1515 } else {
1516 kfree(srv_path->stats);
1517 kfree(srv_path);
1518 }
1519 }
1520
rtrs_srv_close_work(struct work_struct * work)1521 static void rtrs_srv_close_work(struct work_struct *work)
1522 {
1523 struct rtrs_srv_path *srv_path;
1524 struct rtrs_srv_con *con;
1525 int i;
1526
1527 srv_path = container_of(work, typeof(*srv_path), close_work);
1528
1529 rtrs_srv_destroy_path_files(srv_path);
1530 rtrs_srv_stop_hb(srv_path);
1531
1532 for (i = 0; i < srv_path->s.con_num; i++) {
1533 if (!srv_path->s.con[i])
1534 continue;
1535 con = to_srv_con(srv_path->s.con[i]);
1536 rdma_disconnect(con->c.cm_id);
1537 ib_drain_qp(con->c.qp);
1538 }
1539
1540 /*
1541 * Degrade ref count to the usual model with a single shared
1542 * atomic_t counter
1543 */
1544 percpu_ref_kill(&srv_path->ids_inflight_ref);
1545
1546 /* Wait for all completion */
1547 wait_for_completion(&srv_path->complete_done);
1548
1549 /* Notify upper layer if we are the last path */
1550 rtrs_srv_path_down(srv_path);
1551
1552 unmap_cont_bufs(srv_path);
1553 rtrs_srv_free_ops_ids(srv_path);
1554
1555 for (i = 0; i < srv_path->s.con_num; i++) {
1556 if (!srv_path->s.con[i])
1557 continue;
1558 con = to_srv_con(srv_path->s.con[i]);
1559 rtrs_cq_qp_destroy(&con->c);
1560 rdma_destroy_id(con->c.cm_id);
1561 kfree(con);
1562 }
1563 rtrs_ib_dev_put(srv_path->s.dev);
1564
1565 del_path_from_srv(srv_path);
1566 put_srv(srv_path->srv);
1567 srv_path->srv = NULL;
1568 rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSED);
1569
1570 kfree(srv_path->dma_addr);
1571 kfree(srv_path->s.con);
1572 free_path(srv_path);
1573 }
1574
rtrs_rdma_do_accept(struct rtrs_srv_path * srv_path,struct rdma_cm_id * cm_id)1575 static int rtrs_rdma_do_accept(struct rtrs_srv_path *srv_path,
1576 struct rdma_cm_id *cm_id)
1577 {
1578 struct rtrs_srv_sess *srv = srv_path->srv;
1579 struct rtrs_msg_conn_rsp msg;
1580 struct rdma_conn_param param;
1581 int err;
1582
1583 param = (struct rdma_conn_param) {
1584 .rnr_retry_count = 7,
1585 .private_data = &msg,
1586 .private_data_len = sizeof(msg),
1587 };
1588
1589 msg = (struct rtrs_msg_conn_rsp) {
1590 .magic = cpu_to_le16(RTRS_MAGIC),
1591 .version = cpu_to_le16(RTRS_PROTO_VER),
1592 .queue_depth = cpu_to_le16(srv->queue_depth),
1593 .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE),
1594 .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE),
1595 };
1596
1597 if (always_invalidate)
1598 msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F);
1599
1600 err = rdma_accept(cm_id, ¶m);
1601 if (err)
1602 pr_err("rdma_accept(), err: %d\n", err);
1603
1604 return err;
1605 }
1606
rtrs_rdma_do_reject(struct rdma_cm_id * cm_id,int errno)1607 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno)
1608 {
1609 struct rtrs_msg_conn_rsp msg;
1610 int err;
1611
1612 msg = (struct rtrs_msg_conn_rsp) {
1613 .magic = cpu_to_le16(RTRS_MAGIC),
1614 .version = cpu_to_le16(RTRS_PROTO_VER),
1615 .errno = cpu_to_le16(errno),
1616 };
1617
1618 err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED);
1619 if (err)
1620 pr_err("rdma_reject(), err: %d\n", err);
1621
1622 /* Bounce errno back */
1623 return errno;
1624 }
1625
1626 static struct rtrs_srv_path *
__find_path(struct rtrs_srv_sess * srv,const uuid_t * sess_uuid)1627 __find_path(struct rtrs_srv_sess *srv, const uuid_t *sess_uuid)
1628 {
1629 struct rtrs_srv_path *srv_path;
1630
1631 list_for_each_entry(srv_path, &srv->paths_list, s.entry) {
1632 if (uuid_equal(&srv_path->s.uuid, sess_uuid))
1633 return srv_path;
1634 }
1635
1636 return NULL;
1637 }
1638
create_con(struct rtrs_srv_path * srv_path,struct rdma_cm_id * cm_id,unsigned int cid)1639 static int create_con(struct rtrs_srv_path *srv_path,
1640 struct rdma_cm_id *cm_id,
1641 unsigned int cid)
1642 {
1643 struct rtrs_srv_sess *srv = srv_path->srv;
1644 struct rtrs_path *s = &srv_path->s;
1645 struct rtrs_srv_con *con;
1646
1647 u32 cq_num, max_send_wr, max_recv_wr, wr_limit;
1648 int err, cq_vector;
1649
1650 con = kzalloc(sizeof(*con), GFP_KERNEL);
1651 if (!con) {
1652 err = -ENOMEM;
1653 goto err;
1654 }
1655
1656 spin_lock_init(&con->rsp_wr_wait_lock);
1657 INIT_LIST_HEAD(&con->rsp_wr_wait_list);
1658 con->c.cm_id = cm_id;
1659 con->c.path = &srv_path->s;
1660 con->c.cid = cid;
1661 atomic_set(&con->c.wr_cnt, 1);
1662 wr_limit = srv_path->s.dev->ib_dev->attrs.max_qp_wr;
1663
1664 if (con->c.cid == 0) {
1665 /*
1666 * All receive and all send (each requiring invalidate)
1667 * + 2 for drain and heartbeat
1668 */
1669 max_send_wr = min_t(int, wr_limit,
1670 SERVICE_CON_QUEUE_DEPTH * 2 + 2);
1671 max_recv_wr = max_send_wr;
1672 s->signal_interval = min_not_zero(srv->queue_depth,
1673 (size_t)SERVICE_CON_QUEUE_DEPTH);
1674 } else {
1675 /* when always_invlaidate enalbed, we need linv+rinv+mr+imm */
1676 if (always_invalidate)
1677 max_send_wr =
1678 min_t(int, wr_limit,
1679 srv->queue_depth * (1 + 4) + 1);
1680 else
1681 max_send_wr =
1682 min_t(int, wr_limit,
1683 srv->queue_depth * (1 + 2) + 1);
1684
1685 max_recv_wr = srv->queue_depth + 1;
1686 /*
1687 * If we have all receive requests posted and
1688 * all write requests posted and each read request
1689 * requires an invalidate request + drain
1690 * and qp gets into error state.
1691 */
1692 }
1693 cq_num = max_send_wr + max_recv_wr;
1694 atomic_set(&con->c.sq_wr_avail, max_send_wr);
1695 cq_vector = rtrs_srv_get_next_cq_vector(srv_path);
1696
1697 /* TODO: SOFTIRQ can be faster, but be careful with softirq context */
1698 err = rtrs_cq_qp_create(&srv_path->s, &con->c, 1, cq_vector, cq_num,
1699 max_send_wr, max_recv_wr,
1700 IB_POLL_WORKQUEUE);
1701 if (err) {
1702 rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err);
1703 goto free_con;
1704 }
1705 if (con->c.cid == 0) {
1706 err = post_recv_info_req(con);
1707 if (err)
1708 goto free_cqqp;
1709 }
1710 WARN_ON(srv_path->s.con[cid]);
1711 srv_path->s.con[cid] = &con->c;
1712
1713 /*
1714 * Change context from server to current connection. The other
1715 * way is to use cm_id->qp->qp_context, which does not work on OFED.
1716 */
1717 cm_id->context = &con->c;
1718
1719 return 0;
1720
1721 free_cqqp:
1722 rtrs_cq_qp_destroy(&con->c);
1723 free_con:
1724 kfree(con);
1725
1726 err:
1727 return err;
1728 }
1729
__alloc_path(struct rtrs_srv_sess * srv,struct rdma_cm_id * cm_id,unsigned int con_num,unsigned int recon_cnt,const uuid_t * uuid)1730 static struct rtrs_srv_path *__alloc_path(struct rtrs_srv_sess *srv,
1731 struct rdma_cm_id *cm_id,
1732 unsigned int con_num,
1733 unsigned int recon_cnt,
1734 const uuid_t *uuid)
1735 {
1736 struct rtrs_srv_path *srv_path;
1737 int err = -ENOMEM;
1738 char str[NAME_MAX];
1739 struct rtrs_addr path;
1740
1741 if (srv->paths_num >= MAX_PATHS_NUM) {
1742 err = -ECONNRESET;
1743 goto err;
1744 }
1745 if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) {
1746 err = -EEXIST;
1747 pr_err("Path with same addr exists\n");
1748 goto err;
1749 }
1750 srv_path = kzalloc(sizeof(*srv_path), GFP_KERNEL);
1751 if (!srv_path)
1752 goto err;
1753
1754 srv_path->stats = kzalloc(sizeof(*srv_path->stats), GFP_KERNEL);
1755 if (!srv_path->stats)
1756 goto err_free_sess;
1757
1758 srv_path->stats->srv_path = srv_path;
1759
1760 srv_path->dma_addr = kcalloc(srv->queue_depth,
1761 sizeof(*srv_path->dma_addr),
1762 GFP_KERNEL);
1763 if (!srv_path->dma_addr)
1764 goto err_free_stats;
1765
1766 srv_path->s.con = kcalloc(con_num, sizeof(*srv_path->s.con),
1767 GFP_KERNEL);
1768 if (!srv_path->s.con)
1769 goto err_free_dma_addr;
1770
1771 srv_path->state = RTRS_SRV_CONNECTING;
1772 srv_path->srv = srv;
1773 srv_path->cur_cq_vector = -1;
1774 srv_path->s.dst_addr = cm_id->route.addr.dst_addr;
1775 srv_path->s.src_addr = cm_id->route.addr.src_addr;
1776
1777 /* temporary until receiving session-name from client */
1778 path.src = &srv_path->s.src_addr;
1779 path.dst = &srv_path->s.dst_addr;
1780 rtrs_addr_to_str(&path, str, sizeof(str));
1781 strscpy(srv_path->s.sessname, str, sizeof(srv_path->s.sessname));
1782
1783 srv_path->s.con_num = con_num;
1784 srv_path->s.irq_con_num = con_num;
1785 srv_path->s.recon_cnt = recon_cnt;
1786 uuid_copy(&srv_path->s.uuid, uuid);
1787 spin_lock_init(&srv_path->state_lock);
1788 INIT_WORK(&srv_path->close_work, rtrs_srv_close_work);
1789 rtrs_srv_init_hb(srv_path);
1790
1791 srv_path->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd);
1792 if (!srv_path->s.dev) {
1793 err = -ENOMEM;
1794 goto err_free_con;
1795 }
1796 err = map_cont_bufs(srv_path);
1797 if (err)
1798 goto err_put_dev;
1799
1800 err = rtrs_srv_alloc_ops_ids(srv_path);
1801 if (err)
1802 goto err_unmap_bufs;
1803
1804 __add_path_to_srv(srv, srv_path);
1805
1806 return srv_path;
1807
1808 err_unmap_bufs:
1809 unmap_cont_bufs(srv_path);
1810 err_put_dev:
1811 rtrs_ib_dev_put(srv_path->s.dev);
1812 err_free_con:
1813 kfree(srv_path->s.con);
1814 err_free_dma_addr:
1815 kfree(srv_path->dma_addr);
1816 err_free_stats:
1817 kfree(srv_path->stats);
1818 err_free_sess:
1819 kfree(srv_path);
1820 err:
1821 return ERR_PTR(err);
1822 }
1823
rtrs_rdma_connect(struct rdma_cm_id * cm_id,const struct rtrs_msg_conn_req * msg,size_t len)1824 static int rtrs_rdma_connect(struct rdma_cm_id *cm_id,
1825 const struct rtrs_msg_conn_req *msg,
1826 size_t len)
1827 {
1828 struct rtrs_srv_ctx *ctx = cm_id->context;
1829 struct rtrs_srv_path *srv_path;
1830 struct rtrs_srv_sess *srv;
1831
1832 u16 version, con_num, cid;
1833 u16 recon_cnt;
1834 int err = -ECONNRESET;
1835
1836 if (len < sizeof(*msg)) {
1837 pr_err("Invalid RTRS connection request\n");
1838 goto reject_w_err;
1839 }
1840 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) {
1841 pr_err("Invalid RTRS magic\n");
1842 goto reject_w_err;
1843 }
1844 version = le16_to_cpu(msg->version);
1845 if (version >> 8 != RTRS_PROTO_VER_MAJOR) {
1846 pr_err("Unsupported major RTRS version: %d, expected %d\n",
1847 version >> 8, RTRS_PROTO_VER_MAJOR);
1848 goto reject_w_err;
1849 }
1850 con_num = le16_to_cpu(msg->cid_num);
1851 if (con_num > 4096) {
1852 /* Sanity check */
1853 pr_err("Too many connections requested: %d\n", con_num);
1854 goto reject_w_err;
1855 }
1856 cid = le16_to_cpu(msg->cid);
1857 if (cid >= con_num) {
1858 /* Sanity check */
1859 pr_err("Incorrect cid: %d >= %d\n", cid, con_num);
1860 goto reject_w_err;
1861 }
1862 recon_cnt = le16_to_cpu(msg->recon_cnt);
1863 srv = get_or_create_srv(ctx, &msg->paths_uuid, msg->first_conn);
1864 if (IS_ERR(srv)) {
1865 err = PTR_ERR(srv);
1866 pr_err("get_or_create_srv(), error %d\n", err);
1867 goto reject_w_err;
1868 }
1869 mutex_lock(&srv->paths_mutex);
1870 srv_path = __find_path(srv, &msg->sess_uuid);
1871 if (srv_path) {
1872 struct rtrs_path *s = &srv_path->s;
1873
1874 /* Session already holds a reference */
1875 put_srv(srv);
1876
1877 if (srv_path->state != RTRS_SRV_CONNECTING) {
1878 rtrs_err(s, "Session in wrong state: %s\n",
1879 rtrs_srv_state_str(srv_path->state));
1880 mutex_unlock(&srv->paths_mutex);
1881 goto reject_w_err;
1882 }
1883 /*
1884 * Sanity checks
1885 */
1886 if (con_num != s->con_num || cid >= s->con_num) {
1887 rtrs_err(s, "Incorrect request: %d, %d\n",
1888 cid, con_num);
1889 mutex_unlock(&srv->paths_mutex);
1890 goto reject_w_err;
1891 }
1892 if (s->con[cid]) {
1893 rtrs_err(s, "Connection already exists: %d\n",
1894 cid);
1895 mutex_unlock(&srv->paths_mutex);
1896 goto reject_w_err;
1897 }
1898 } else {
1899 srv_path = __alloc_path(srv, cm_id, con_num, recon_cnt,
1900 &msg->sess_uuid);
1901 if (IS_ERR(srv_path)) {
1902 mutex_unlock(&srv->paths_mutex);
1903 put_srv(srv);
1904 err = PTR_ERR(srv_path);
1905 pr_err("RTRS server session allocation failed: %d\n", err);
1906 goto reject_w_err;
1907 }
1908 }
1909 err = create_con(srv_path, cm_id, cid);
1910 if (err) {
1911 rtrs_err((&srv_path->s), "create_con(), error %d\n", err);
1912 rtrs_rdma_do_reject(cm_id, err);
1913 /*
1914 * Since session has other connections we follow normal way
1915 * through workqueue, but still return an error to tell cma.c
1916 * to call rdma_destroy_id() for current connection.
1917 */
1918 goto close_and_return_err;
1919 }
1920 err = rtrs_rdma_do_accept(srv_path, cm_id);
1921 if (err) {
1922 rtrs_err((&srv_path->s), "rtrs_rdma_do_accept(), error %d\n", err);
1923 rtrs_rdma_do_reject(cm_id, err);
1924 /*
1925 * Since current connection was successfully added to the
1926 * session we follow normal way through workqueue to close the
1927 * session, thus return 0 to tell cma.c we call
1928 * rdma_destroy_id() ourselves.
1929 */
1930 err = 0;
1931 goto close_and_return_err;
1932 }
1933 mutex_unlock(&srv->paths_mutex);
1934
1935 return 0;
1936
1937 reject_w_err:
1938 return rtrs_rdma_do_reject(cm_id, err);
1939
1940 close_and_return_err:
1941 mutex_unlock(&srv->paths_mutex);
1942 close_path(srv_path);
1943
1944 return err;
1945 }
1946
rtrs_srv_rdma_cm_handler(struct rdma_cm_id * cm_id,struct rdma_cm_event * ev)1947 static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id,
1948 struct rdma_cm_event *ev)
1949 {
1950 struct rtrs_srv_path *srv_path = NULL;
1951 struct rtrs_path *s = NULL;
1952
1953 if (ev->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
1954 struct rtrs_con *c = cm_id->context;
1955
1956 s = c->path;
1957 srv_path = to_srv_path(s);
1958 }
1959
1960 switch (ev->event) {
1961 case RDMA_CM_EVENT_CONNECT_REQUEST:
1962 /*
1963 * In case of error cma.c will destroy cm_id,
1964 * see cma_process_remove()
1965 */
1966 return rtrs_rdma_connect(cm_id, ev->param.conn.private_data,
1967 ev->param.conn.private_data_len);
1968 case RDMA_CM_EVENT_ESTABLISHED:
1969 /* Nothing here */
1970 break;
1971 case RDMA_CM_EVENT_REJECTED:
1972 case RDMA_CM_EVENT_CONNECT_ERROR:
1973 case RDMA_CM_EVENT_UNREACHABLE:
1974 rtrs_err(s, "CM error (CM event: %s, err: %d)\n",
1975 rdma_event_msg(ev->event), ev->status);
1976 fallthrough;
1977 case RDMA_CM_EVENT_DISCONNECTED:
1978 case RDMA_CM_EVENT_ADDR_CHANGE:
1979 case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1980 case RDMA_CM_EVENT_DEVICE_REMOVAL:
1981 close_path(srv_path);
1982 break;
1983 default:
1984 pr_err("Ignoring unexpected CM event %s, err %d\n",
1985 rdma_event_msg(ev->event), ev->status);
1986 break;
1987 }
1988
1989 return 0;
1990 }
1991
rtrs_srv_cm_init(struct rtrs_srv_ctx * ctx,struct sockaddr * addr,enum rdma_ucm_port_space ps)1992 static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx,
1993 struct sockaddr *addr,
1994 enum rdma_ucm_port_space ps)
1995 {
1996 struct rdma_cm_id *cm_id;
1997 int ret;
1998
1999 cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler,
2000 ctx, ps, IB_QPT_RC);
2001 if (IS_ERR(cm_id)) {
2002 ret = PTR_ERR(cm_id);
2003 pr_err("Creating id for RDMA connection failed, err: %d\n",
2004 ret);
2005 goto err_out;
2006 }
2007 ret = rdma_bind_addr(cm_id, addr);
2008 if (ret) {
2009 pr_err("Binding RDMA address failed, err: %d\n", ret);
2010 goto err_cm;
2011 }
2012 ret = rdma_listen(cm_id, 64);
2013 if (ret) {
2014 pr_err("Listening on RDMA connection failed, err: %d\n",
2015 ret);
2016 goto err_cm;
2017 }
2018
2019 return cm_id;
2020
2021 err_cm:
2022 rdma_destroy_id(cm_id);
2023 err_out:
2024
2025 return ERR_PTR(ret);
2026 }
2027
rtrs_srv_rdma_init(struct rtrs_srv_ctx * ctx,u16 port)2028 static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port)
2029 {
2030 struct sockaddr_in6 sin = {
2031 .sin6_family = AF_INET6,
2032 .sin6_addr = IN6ADDR_ANY_INIT,
2033 .sin6_port = htons(port),
2034 };
2035 struct sockaddr_ib sib = {
2036 .sib_family = AF_IB,
2037 .sib_sid = cpu_to_be64(RDMA_IB_IP_PS_IB | port),
2038 .sib_sid_mask = cpu_to_be64(0xffffffffffffffffULL),
2039 .sib_pkey = cpu_to_be16(0xffff),
2040 };
2041 struct rdma_cm_id *cm_ip, *cm_ib;
2042 int ret;
2043
2044 /*
2045 * We accept both IPoIB and IB connections, so we need to keep
2046 * two cm id's, one for each socket type and port space.
2047 * If the cm initialization of one of the id's fails, we abort
2048 * everything.
2049 */
2050 cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP);
2051 if (IS_ERR(cm_ip))
2052 return PTR_ERR(cm_ip);
2053
2054 cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB);
2055 if (IS_ERR(cm_ib)) {
2056 ret = PTR_ERR(cm_ib);
2057 goto free_cm_ip;
2058 }
2059
2060 ctx->cm_id_ip = cm_ip;
2061 ctx->cm_id_ib = cm_ib;
2062
2063 return 0;
2064
2065 free_cm_ip:
2066 rdma_destroy_id(cm_ip);
2067
2068 return ret;
2069 }
2070
alloc_srv_ctx(struct rtrs_srv_ops * ops)2071 static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops)
2072 {
2073 struct rtrs_srv_ctx *ctx;
2074
2075 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
2076 if (!ctx)
2077 return NULL;
2078
2079 ctx->ops = *ops;
2080 mutex_init(&ctx->srv_mutex);
2081 INIT_LIST_HEAD(&ctx->srv_list);
2082
2083 return ctx;
2084 }
2085
free_srv_ctx(struct rtrs_srv_ctx * ctx)2086 static void free_srv_ctx(struct rtrs_srv_ctx *ctx)
2087 {
2088 WARN_ON(!list_empty(&ctx->srv_list));
2089 mutex_destroy(&ctx->srv_mutex);
2090 kfree(ctx);
2091 }
2092
rtrs_srv_add_one(struct ib_device * device)2093 static int rtrs_srv_add_one(struct ib_device *device)
2094 {
2095 struct rtrs_srv_ctx *ctx;
2096 int ret = 0;
2097
2098 mutex_lock(&ib_ctx.ib_dev_mutex);
2099 if (ib_ctx.ib_dev_count)
2100 goto out;
2101
2102 /*
2103 * Since our CM IDs are NOT bound to any ib device we will create them
2104 * only once
2105 */
2106 ctx = ib_ctx.srv_ctx;
2107 ret = rtrs_srv_rdma_init(ctx, ib_ctx.port);
2108 if (ret) {
2109 /*
2110 * We errored out here.
2111 * According to the ib code, if we encounter an error here then the
2112 * error code is ignored, and no more calls to our ops are made.
2113 */
2114 pr_err("Failed to initialize RDMA connection");
2115 goto err_out;
2116 }
2117
2118 out:
2119 /*
2120 * Keep a track on the number of ib devices added
2121 */
2122 ib_ctx.ib_dev_count++;
2123
2124 err_out:
2125 mutex_unlock(&ib_ctx.ib_dev_mutex);
2126 return ret;
2127 }
2128
rtrs_srv_remove_one(struct ib_device * device,void * client_data)2129 static void rtrs_srv_remove_one(struct ib_device *device, void *client_data)
2130 {
2131 struct rtrs_srv_ctx *ctx;
2132
2133 mutex_lock(&ib_ctx.ib_dev_mutex);
2134 ib_ctx.ib_dev_count--;
2135
2136 if (ib_ctx.ib_dev_count)
2137 goto out;
2138
2139 /*
2140 * Since our CM IDs are NOT bound to any ib device we will remove them
2141 * only once, when the last device is removed
2142 */
2143 ctx = ib_ctx.srv_ctx;
2144 rdma_destroy_id(ctx->cm_id_ip);
2145 rdma_destroy_id(ctx->cm_id_ib);
2146
2147 out:
2148 mutex_unlock(&ib_ctx.ib_dev_mutex);
2149 }
2150
2151 static struct ib_client rtrs_srv_client = {
2152 .name = "rtrs_server",
2153 .add = rtrs_srv_add_one,
2154 .remove = rtrs_srv_remove_one
2155 };
2156
2157 /**
2158 * rtrs_srv_open() - open RTRS server context
2159 * @ops: callback functions
2160 * @port: port to listen on
2161 *
2162 * Creates server context with specified callbacks.
2163 *
2164 * Return a valid pointer on success otherwise PTR_ERR.
2165 */
rtrs_srv_open(struct rtrs_srv_ops * ops,u16 port)2166 struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port)
2167 {
2168 struct rtrs_srv_ctx *ctx;
2169 int err;
2170
2171 ctx = alloc_srv_ctx(ops);
2172 if (!ctx)
2173 return ERR_PTR(-ENOMEM);
2174
2175 mutex_init(&ib_ctx.ib_dev_mutex);
2176 ib_ctx.srv_ctx = ctx;
2177 ib_ctx.port = port;
2178
2179 err = ib_register_client(&rtrs_srv_client);
2180 if (err) {
2181 free_srv_ctx(ctx);
2182 return ERR_PTR(err);
2183 }
2184
2185 return ctx;
2186 }
2187 EXPORT_SYMBOL(rtrs_srv_open);
2188
close_paths(struct rtrs_srv_sess * srv)2189 static void close_paths(struct rtrs_srv_sess *srv)
2190 {
2191 struct rtrs_srv_path *srv_path;
2192
2193 mutex_lock(&srv->paths_mutex);
2194 list_for_each_entry(srv_path, &srv->paths_list, s.entry)
2195 close_path(srv_path);
2196 mutex_unlock(&srv->paths_mutex);
2197 }
2198
close_ctx(struct rtrs_srv_ctx * ctx)2199 static void close_ctx(struct rtrs_srv_ctx *ctx)
2200 {
2201 struct rtrs_srv_sess *srv;
2202
2203 mutex_lock(&ctx->srv_mutex);
2204 list_for_each_entry(srv, &ctx->srv_list, ctx_list)
2205 close_paths(srv);
2206 mutex_unlock(&ctx->srv_mutex);
2207 flush_workqueue(rtrs_wq);
2208 }
2209
2210 /**
2211 * rtrs_srv_close() - close RTRS server context
2212 * @ctx: pointer to server context
2213 *
2214 * Closes RTRS server context with all client sessions.
2215 */
rtrs_srv_close(struct rtrs_srv_ctx * ctx)2216 void rtrs_srv_close(struct rtrs_srv_ctx *ctx)
2217 {
2218 ib_unregister_client(&rtrs_srv_client);
2219 mutex_destroy(&ib_ctx.ib_dev_mutex);
2220 close_ctx(ctx);
2221 free_srv_ctx(ctx);
2222 }
2223 EXPORT_SYMBOL(rtrs_srv_close);
2224
check_module_params(void)2225 static int check_module_params(void)
2226 {
2227 if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) {
2228 pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n",
2229 sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH);
2230 return -EINVAL;
2231 }
2232 if (max_chunk_size < MIN_CHUNK_SIZE || !is_power_of_2(max_chunk_size)) {
2233 pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n",
2234 max_chunk_size, MIN_CHUNK_SIZE);
2235 return -EINVAL;
2236 }
2237
2238 /*
2239 * Check if IB immediate data size is enough to hold the mem_id and the
2240 * offset inside the memory chunk
2241 */
2242 if ((ilog2(sess_queue_depth - 1) + 1) +
2243 (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) {
2244 pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n",
2245 MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size);
2246 return -EINVAL;
2247 }
2248
2249 return 0;
2250 }
2251
rtrs_server_init(void)2252 static int __init rtrs_server_init(void)
2253 {
2254 int err;
2255
2256 pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n",
2257 KBUILD_MODNAME, RTRS_PROTO_VER_STRING,
2258 max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE,
2259 sess_queue_depth, always_invalidate);
2260
2261 rtrs_rdma_dev_pd_init(0, &dev_pd);
2262
2263 err = check_module_params();
2264 if (err) {
2265 pr_err("Failed to load module, invalid module parameters, err: %d\n",
2266 err);
2267 return err;
2268 }
2269 chunk_pool = mempool_create_page_pool(sess_queue_depth * CHUNK_POOL_SZ,
2270 get_order(max_chunk_size));
2271 if (!chunk_pool)
2272 return -ENOMEM;
2273 rtrs_dev_class = class_create(THIS_MODULE, "rtrs-server");
2274 if (IS_ERR(rtrs_dev_class)) {
2275 err = PTR_ERR(rtrs_dev_class);
2276 goto out_chunk_pool;
2277 }
2278 rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0);
2279 if (!rtrs_wq) {
2280 err = -ENOMEM;
2281 goto out_dev_class;
2282 }
2283
2284 return 0;
2285
2286 out_dev_class:
2287 class_destroy(rtrs_dev_class);
2288 out_chunk_pool:
2289 mempool_destroy(chunk_pool);
2290
2291 return err;
2292 }
2293
rtrs_server_exit(void)2294 static void __exit rtrs_server_exit(void)
2295 {
2296 destroy_workqueue(rtrs_wq);
2297 class_destroy(rtrs_dev_class);
2298 mempool_destroy(chunk_pool);
2299 rtrs_rdma_dev_pd_deinit(&dev_pd);
2300 }
2301
2302 module_init(rtrs_server_init);
2303 module_exit(rtrs_server_exit);
2304