1 #include <linux/ceph/ceph_debug.h>
2 
3 #include <linux/fs.h>
4 #include <linux/wait.h>
5 #include <linux/slab.h>
6 #include <linux/sched.h>
7 #include <linux/debugfs.h>
8 #include <linux/seq_file.h>
9 
10 #include "super.h"
11 #include "mds_client.h"
12 
13 #include <linux/ceph/messenger.h>
14 #include <linux/ceph/decode.h>
15 #include <linux/ceph/pagelist.h>
16 #include <linux/ceph/auth.h>
17 #include <linux/ceph/debugfs.h>
18 
19 /*
20  * A cluster of MDS (metadata server) daemons is responsible for
21  * managing the file system namespace (the directory hierarchy and
22  * inodes) and for coordinating shared access to storage.  Metadata is
23  * partitioning hierarchically across a number of servers, and that
24  * partition varies over time as the cluster adjusts the distribution
25  * in order to balance load.
26  *
27  * The MDS client is primarily responsible to managing synchronous
28  * metadata requests for operations like open, unlink, and so forth.
29  * If there is a MDS failure, we find out about it when we (possibly
30  * request and) receive a new MDS map, and can resubmit affected
31  * requests.
32  *
33  * For the most part, though, we take advantage of a lossless
34  * communications channel to the MDS, and do not need to worry about
35  * timing out or resubmitting requests.
36  *
37  * We maintain a stateful "session" with each MDS we interact with.
38  * Within each session, we sent periodic heartbeat messages to ensure
39  * any capabilities or leases we have been issues remain valid.  If
40  * the session times out and goes stale, our leases and capabilities
41  * are no longer valid.
42  */
43 
44 struct ceph_reconnect_state {
45 	struct ceph_pagelist *pagelist;
46 	bool flock;
47 };
48 
49 static void __wake_requests(struct ceph_mds_client *mdsc,
50 			    struct list_head *head);
51 
52 static const struct ceph_connection_operations mds_con_ops;
53 
54 
55 /*
56  * mds reply parsing
57  */
58 
59 /*
60  * parse individual inode info
61  */
parse_reply_info_in(void ** p,void * end,struct ceph_mds_reply_info_in * info,int features)62 static int parse_reply_info_in(void **p, void *end,
63 			       struct ceph_mds_reply_info_in *info,
64 			       int features)
65 {
66 	int err = -EIO;
67 
68 	info->in = *p;
69 	*p += sizeof(struct ceph_mds_reply_inode) +
70 		sizeof(*info->in->fragtree.splits) *
71 		le32_to_cpu(info->in->fragtree.nsplits);
72 
73 	ceph_decode_32_safe(p, end, info->symlink_len, bad);
74 	ceph_decode_need(p, end, info->symlink_len, bad);
75 	info->symlink = *p;
76 	*p += info->symlink_len;
77 
78 	if (features & CEPH_FEATURE_DIRLAYOUTHASH)
79 		ceph_decode_copy_safe(p, end, &info->dir_layout,
80 				      sizeof(info->dir_layout), bad);
81 	else
82 		memset(&info->dir_layout, 0, sizeof(info->dir_layout));
83 
84 	ceph_decode_32_safe(p, end, info->xattr_len, bad);
85 	ceph_decode_need(p, end, info->xattr_len, bad);
86 	info->xattr_data = *p;
87 	*p += info->xattr_len;
88 	return 0;
89 bad:
90 	return err;
91 }
92 
93 /*
94  * parse a normal reply, which may contain a (dir+)dentry and/or a
95  * target inode.
96  */
parse_reply_info_trace(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,int features)97 static int parse_reply_info_trace(void **p, void *end,
98 				  struct ceph_mds_reply_info_parsed *info,
99 				  int features)
100 {
101 	int err;
102 
103 	if (info->head->is_dentry) {
104 		err = parse_reply_info_in(p, end, &info->diri, features);
105 		if (err < 0)
106 			goto out_bad;
107 
108 		if (unlikely(*p + sizeof(*info->dirfrag) > end))
109 			goto bad;
110 		info->dirfrag = *p;
111 		*p += sizeof(*info->dirfrag) +
112 			sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
113 		if (unlikely(*p > end))
114 			goto bad;
115 
116 		ceph_decode_32_safe(p, end, info->dname_len, bad);
117 		ceph_decode_need(p, end, info->dname_len, bad);
118 		info->dname = *p;
119 		*p += info->dname_len;
120 		info->dlease = *p;
121 		*p += sizeof(*info->dlease);
122 	}
123 
124 	if (info->head->is_target) {
125 		err = parse_reply_info_in(p, end, &info->targeti, features);
126 		if (err < 0)
127 			goto out_bad;
128 	}
129 
130 	if (unlikely(*p != end))
131 		goto bad;
132 	return 0;
133 
134 bad:
135 	err = -EIO;
136 out_bad:
137 	pr_err("problem parsing mds trace %d\n", err);
138 	return err;
139 }
140 
141 /*
142  * parse readdir results
143  */
parse_reply_info_dir(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,int features)144 static int parse_reply_info_dir(void **p, void *end,
145 				struct ceph_mds_reply_info_parsed *info,
146 				int features)
147 {
148 	u32 num, i = 0;
149 	int err;
150 
151 	info->dir_dir = *p;
152 	if (*p + sizeof(*info->dir_dir) > end)
153 		goto bad;
154 	*p += sizeof(*info->dir_dir) +
155 		sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
156 	if (*p > end)
157 		goto bad;
158 
159 	ceph_decode_need(p, end, sizeof(num) + 2, bad);
160 	num = ceph_decode_32(p);
161 	info->dir_end = ceph_decode_8(p);
162 	info->dir_complete = ceph_decode_8(p);
163 	if (num == 0)
164 		goto done;
165 
166 	/* alloc large array */
167 	info->dir_nr = num;
168 	info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
169 			       sizeof(*info->dir_dname) +
170 			       sizeof(*info->dir_dname_len) +
171 			       sizeof(*info->dir_dlease),
172 			       GFP_NOFS);
173 	if (info->dir_in == NULL) {
174 		err = -ENOMEM;
175 		goto out_bad;
176 	}
177 	info->dir_dname = (void *)(info->dir_in + num);
178 	info->dir_dname_len = (void *)(info->dir_dname + num);
179 	info->dir_dlease = (void *)(info->dir_dname_len + num);
180 
181 	while (num) {
182 		/* dentry */
183 		ceph_decode_need(p, end, sizeof(u32)*2, bad);
184 		info->dir_dname_len[i] = ceph_decode_32(p);
185 		ceph_decode_need(p, end, info->dir_dname_len[i], bad);
186 		info->dir_dname[i] = *p;
187 		*p += info->dir_dname_len[i];
188 		dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
189 		     info->dir_dname[i]);
190 		info->dir_dlease[i] = *p;
191 		*p += sizeof(struct ceph_mds_reply_lease);
192 
193 		/* inode */
194 		err = parse_reply_info_in(p, end, &info->dir_in[i], features);
195 		if (err < 0)
196 			goto out_bad;
197 		i++;
198 		num--;
199 	}
200 
201 done:
202 	if (*p != end)
203 		goto bad;
204 	return 0;
205 
206 bad:
207 	err = -EIO;
208 out_bad:
209 	pr_err("problem parsing dir contents %d\n", err);
210 	return err;
211 }
212 
213 /*
214  * parse fcntl F_GETLK results
215  */
parse_reply_info_filelock(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,int features)216 static int parse_reply_info_filelock(void **p, void *end,
217 				     struct ceph_mds_reply_info_parsed *info,
218 				     int features)
219 {
220 	if (*p + sizeof(*info->filelock_reply) > end)
221 		goto bad;
222 
223 	info->filelock_reply = *p;
224 	*p += sizeof(*info->filelock_reply);
225 
226 	if (unlikely(*p != end))
227 		goto bad;
228 	return 0;
229 
230 bad:
231 	return -EIO;
232 }
233 
234 /*
235  * parse extra results
236  */
parse_reply_info_extra(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,int features)237 static int parse_reply_info_extra(void **p, void *end,
238 				  struct ceph_mds_reply_info_parsed *info,
239 				  int features)
240 {
241 	if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
242 		return parse_reply_info_filelock(p, end, info, features);
243 	else
244 		return parse_reply_info_dir(p, end, info, features);
245 }
246 
247 /*
248  * parse entire mds reply
249  */
parse_reply_info(struct ceph_msg * msg,struct ceph_mds_reply_info_parsed * info,int features)250 static int parse_reply_info(struct ceph_msg *msg,
251 			    struct ceph_mds_reply_info_parsed *info,
252 			    int features)
253 {
254 	void *p, *end;
255 	u32 len;
256 	int err;
257 
258 	info->head = msg->front.iov_base;
259 	p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
260 	end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
261 
262 	/* trace */
263 	ceph_decode_32_safe(&p, end, len, bad);
264 	if (len > 0) {
265 		err = parse_reply_info_trace(&p, p+len, info, features);
266 		if (err < 0)
267 			goto out_bad;
268 	}
269 
270 	/* extra */
271 	ceph_decode_32_safe(&p, end, len, bad);
272 	if (len > 0) {
273 		err = parse_reply_info_extra(&p, p+len, info, features);
274 		if (err < 0)
275 			goto out_bad;
276 	}
277 
278 	/* snap blob */
279 	ceph_decode_32_safe(&p, end, len, bad);
280 	info->snapblob_len = len;
281 	info->snapblob = p;
282 	p += len;
283 
284 	if (p != end)
285 		goto bad;
286 	return 0;
287 
288 bad:
289 	err = -EIO;
290 out_bad:
291 	pr_err("mds parse_reply err %d\n", err);
292 	return err;
293 }
294 
destroy_reply_info(struct ceph_mds_reply_info_parsed * info)295 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
296 {
297 	kfree(info->dir_in);
298 }
299 
300 
301 /*
302  * sessions
303  */
session_state_name(int s)304 static const char *session_state_name(int s)
305 {
306 	switch (s) {
307 	case CEPH_MDS_SESSION_NEW: return "new";
308 	case CEPH_MDS_SESSION_OPENING: return "opening";
309 	case CEPH_MDS_SESSION_OPEN: return "open";
310 	case CEPH_MDS_SESSION_HUNG: return "hung";
311 	case CEPH_MDS_SESSION_CLOSING: return "closing";
312 	case CEPH_MDS_SESSION_RESTARTING: return "restarting";
313 	case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
314 	default: return "???";
315 	}
316 }
317 
get_session(struct ceph_mds_session * s)318 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
319 {
320 	if (atomic_inc_not_zero(&s->s_ref)) {
321 		dout("mdsc get_session %p %d -> %d\n", s,
322 		     atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
323 		return s;
324 	} else {
325 		dout("mdsc get_session %p 0 -- FAIL", s);
326 		return NULL;
327 	}
328 }
329 
ceph_put_mds_session(struct ceph_mds_session * s)330 void ceph_put_mds_session(struct ceph_mds_session *s)
331 {
332 	dout("mdsc put_session %p %d -> %d\n", s,
333 	     atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
334 	if (atomic_dec_and_test(&s->s_ref)) {
335 		if (s->s_authorizer)
336 		     s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer(
337 			     s->s_mdsc->fsc->client->monc.auth,
338 			     s->s_authorizer);
339 		kfree(s);
340 	}
341 }
342 
343 /*
344  * called under mdsc->mutex
345  */
__ceph_lookup_mds_session(struct ceph_mds_client * mdsc,int mds)346 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
347 						   int mds)
348 {
349 	struct ceph_mds_session *session;
350 
351 	if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
352 		return NULL;
353 	session = mdsc->sessions[mds];
354 	dout("lookup_mds_session %p %d\n", session,
355 	     atomic_read(&session->s_ref));
356 	get_session(session);
357 	return session;
358 }
359 
__have_session(struct ceph_mds_client * mdsc,int mds)360 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
361 {
362 	if (mds >= mdsc->max_sessions)
363 		return false;
364 	return mdsc->sessions[mds];
365 }
366 
__verify_registered_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * s)367 static int __verify_registered_session(struct ceph_mds_client *mdsc,
368 				       struct ceph_mds_session *s)
369 {
370 	if (s->s_mds >= mdsc->max_sessions ||
371 	    mdsc->sessions[s->s_mds] != s)
372 		return -ENOENT;
373 	return 0;
374 }
375 
376 /*
377  * create+register a new session for given mds.
378  * called under mdsc->mutex.
379  */
register_session(struct ceph_mds_client * mdsc,int mds)380 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
381 						 int mds)
382 {
383 	struct ceph_mds_session *s;
384 
385 	s = kzalloc(sizeof(*s), GFP_NOFS);
386 	if (!s)
387 		return ERR_PTR(-ENOMEM);
388 	s->s_mdsc = mdsc;
389 	s->s_mds = mds;
390 	s->s_state = CEPH_MDS_SESSION_NEW;
391 	s->s_ttl = 0;
392 	s->s_seq = 0;
393 	mutex_init(&s->s_mutex);
394 
395 	ceph_con_init(mdsc->fsc->client->msgr, &s->s_con);
396 	s->s_con.private = s;
397 	s->s_con.ops = &mds_con_ops;
398 	s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
399 	s->s_con.peer_name.num = cpu_to_le64(mds);
400 
401 	spin_lock_init(&s->s_cap_lock);
402 	s->s_cap_gen = 0;
403 	s->s_cap_ttl = 0;
404 	s->s_renew_requested = 0;
405 	s->s_renew_seq = 0;
406 	INIT_LIST_HEAD(&s->s_caps);
407 	s->s_nr_caps = 0;
408 	s->s_trim_caps = 0;
409 	atomic_set(&s->s_ref, 1);
410 	INIT_LIST_HEAD(&s->s_waiting);
411 	INIT_LIST_HEAD(&s->s_unsafe);
412 	s->s_num_cap_releases = 0;
413 	s->s_cap_iterator = NULL;
414 	INIT_LIST_HEAD(&s->s_cap_releases);
415 	INIT_LIST_HEAD(&s->s_cap_releases_done);
416 	INIT_LIST_HEAD(&s->s_cap_flushing);
417 	INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
418 
419 	dout("register_session mds%d\n", mds);
420 	if (mds >= mdsc->max_sessions) {
421 		int newmax = 1 << get_count_order(mds+1);
422 		struct ceph_mds_session **sa;
423 
424 		dout("register_session realloc to %d\n", newmax);
425 		sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
426 		if (sa == NULL)
427 			goto fail_realloc;
428 		if (mdsc->sessions) {
429 			memcpy(sa, mdsc->sessions,
430 			       mdsc->max_sessions * sizeof(void *));
431 			kfree(mdsc->sessions);
432 		}
433 		mdsc->sessions = sa;
434 		mdsc->max_sessions = newmax;
435 	}
436 	mdsc->sessions[mds] = s;
437 	atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
438 
439 	ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
440 
441 	return s;
442 
443 fail_realloc:
444 	kfree(s);
445 	return ERR_PTR(-ENOMEM);
446 }
447 
448 /*
449  * called under mdsc->mutex
450  */
__unregister_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * s)451 static void __unregister_session(struct ceph_mds_client *mdsc,
452 			       struct ceph_mds_session *s)
453 {
454 	dout("__unregister_session mds%d %p\n", s->s_mds, s);
455 	BUG_ON(mdsc->sessions[s->s_mds] != s);
456 	mdsc->sessions[s->s_mds] = NULL;
457 	ceph_con_close(&s->s_con);
458 	ceph_put_mds_session(s);
459 }
460 
461 /*
462  * drop session refs in request.
463  *
464  * should be last request ref, or hold mdsc->mutex
465  */
put_request_session(struct ceph_mds_request * req)466 static void put_request_session(struct ceph_mds_request *req)
467 {
468 	if (req->r_session) {
469 		ceph_put_mds_session(req->r_session);
470 		req->r_session = NULL;
471 	}
472 }
473 
ceph_mdsc_release_request(struct kref * kref)474 void ceph_mdsc_release_request(struct kref *kref)
475 {
476 	struct ceph_mds_request *req = container_of(kref,
477 						    struct ceph_mds_request,
478 						    r_kref);
479 	if (req->r_request)
480 		ceph_msg_put(req->r_request);
481 	if (req->r_reply) {
482 		ceph_msg_put(req->r_reply);
483 		destroy_reply_info(&req->r_reply_info);
484 	}
485 	if (req->r_inode) {
486 		ceph_put_cap_refs(ceph_inode(req->r_inode),
487 				  CEPH_CAP_PIN);
488 		iput(req->r_inode);
489 	}
490 	if (req->r_locked_dir)
491 		ceph_put_cap_refs(ceph_inode(req->r_locked_dir),
492 				  CEPH_CAP_PIN);
493 	if (req->r_target_inode)
494 		iput(req->r_target_inode);
495 	if (req->r_dentry)
496 		dput(req->r_dentry);
497 	if (req->r_old_dentry) {
498 		ceph_put_cap_refs(
499 			ceph_inode(req->r_old_dentry->d_parent->d_inode),
500 			CEPH_CAP_PIN);
501 		dput(req->r_old_dentry);
502 	}
503 	kfree(req->r_path1);
504 	kfree(req->r_path2);
505 	put_request_session(req);
506 	ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
507 	kfree(req);
508 }
509 
510 /*
511  * lookup session, bump ref if found.
512  *
513  * called under mdsc->mutex.
514  */
__lookup_request(struct ceph_mds_client * mdsc,u64 tid)515 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
516 					     u64 tid)
517 {
518 	struct ceph_mds_request *req;
519 	struct rb_node *n = mdsc->request_tree.rb_node;
520 
521 	while (n) {
522 		req = rb_entry(n, struct ceph_mds_request, r_node);
523 		if (tid < req->r_tid)
524 			n = n->rb_left;
525 		else if (tid > req->r_tid)
526 			n = n->rb_right;
527 		else {
528 			ceph_mdsc_get_request(req);
529 			return req;
530 		}
531 	}
532 	return NULL;
533 }
534 
__insert_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * new)535 static void __insert_request(struct ceph_mds_client *mdsc,
536 			     struct ceph_mds_request *new)
537 {
538 	struct rb_node **p = &mdsc->request_tree.rb_node;
539 	struct rb_node *parent = NULL;
540 	struct ceph_mds_request *req = NULL;
541 
542 	while (*p) {
543 		parent = *p;
544 		req = rb_entry(parent, struct ceph_mds_request, r_node);
545 		if (new->r_tid < req->r_tid)
546 			p = &(*p)->rb_left;
547 		else if (new->r_tid > req->r_tid)
548 			p = &(*p)->rb_right;
549 		else
550 			BUG();
551 	}
552 
553 	rb_link_node(&new->r_node, parent, p);
554 	rb_insert_color(&new->r_node, &mdsc->request_tree);
555 }
556 
557 /*
558  * Register an in-flight request, and assign a tid.  Link to directory
559  * are modifying (if any).
560  *
561  * Called under mdsc->mutex.
562  */
__register_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,struct inode * dir)563 static void __register_request(struct ceph_mds_client *mdsc,
564 			       struct ceph_mds_request *req,
565 			       struct inode *dir)
566 {
567 	req->r_tid = ++mdsc->last_tid;
568 	if (req->r_num_caps)
569 		ceph_reserve_caps(mdsc, &req->r_caps_reservation,
570 				  req->r_num_caps);
571 	dout("__register_request %p tid %lld\n", req, req->r_tid);
572 	ceph_mdsc_get_request(req);
573 	__insert_request(mdsc, req);
574 
575 	req->r_uid = current_fsuid();
576 	req->r_gid = current_fsgid();
577 
578 	if (dir) {
579 		struct ceph_inode_info *ci = ceph_inode(dir);
580 
581 		spin_lock(&ci->i_unsafe_lock);
582 		req->r_unsafe_dir = dir;
583 		list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
584 		spin_unlock(&ci->i_unsafe_lock);
585 	}
586 }
587 
__unregister_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)588 static void __unregister_request(struct ceph_mds_client *mdsc,
589 				 struct ceph_mds_request *req)
590 {
591 	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
592 	rb_erase(&req->r_node, &mdsc->request_tree);
593 	RB_CLEAR_NODE(&req->r_node);
594 
595 	if (req->r_unsafe_dir) {
596 		struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
597 
598 		spin_lock(&ci->i_unsafe_lock);
599 		list_del_init(&req->r_unsafe_dir_item);
600 		spin_unlock(&ci->i_unsafe_lock);
601 	}
602 
603 	ceph_mdsc_put_request(req);
604 }
605 
606 /*
607  * Choose mds to send request to next.  If there is a hint set in the
608  * request (e.g., due to a prior forward hint from the mds), use that.
609  * Otherwise, consult frag tree and/or caps to identify the
610  * appropriate mds.  If all else fails, choose randomly.
611  *
612  * Called under mdsc->mutex.
613  */
get_nonsnap_parent(struct dentry * dentry)614 struct dentry *get_nonsnap_parent(struct dentry *dentry)
615 {
616 	while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
617 		dentry = dentry->d_parent;
618 	return dentry;
619 }
620 
__choose_mds(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)621 static int __choose_mds(struct ceph_mds_client *mdsc,
622 			struct ceph_mds_request *req)
623 {
624 	struct inode *inode;
625 	struct ceph_inode_info *ci;
626 	struct ceph_cap *cap;
627 	int mode = req->r_direct_mode;
628 	int mds = -1;
629 	u32 hash = req->r_direct_hash;
630 	bool is_hash = req->r_direct_is_hash;
631 
632 	/*
633 	 * is there a specific mds we should try?  ignore hint if we have
634 	 * no session and the mds is not up (active or recovering).
635 	 */
636 	if (req->r_resend_mds >= 0 &&
637 	    (__have_session(mdsc, req->r_resend_mds) ||
638 	     ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
639 		dout("choose_mds using resend_mds mds%d\n",
640 		     req->r_resend_mds);
641 		return req->r_resend_mds;
642 	}
643 
644 	if (mode == USE_RANDOM_MDS)
645 		goto random;
646 
647 	inode = NULL;
648 	if (req->r_inode) {
649 		inode = req->r_inode;
650 	} else if (req->r_dentry) {
651 		struct inode *dir = req->r_dentry->d_parent->d_inode;
652 
653 		if (dir->i_sb != mdsc->fsc->sb) {
654 			/* not this fs! */
655 			inode = req->r_dentry->d_inode;
656 		} else if (ceph_snap(dir) != CEPH_NOSNAP) {
657 			/* direct snapped/virtual snapdir requests
658 			 * based on parent dir inode */
659 			struct dentry *dn =
660 				get_nonsnap_parent(req->r_dentry->d_parent);
661 			inode = dn->d_inode;
662 			dout("__choose_mds using nonsnap parent %p\n", inode);
663 		} else if (req->r_dentry->d_inode) {
664 			/* dentry target */
665 			inode = req->r_dentry->d_inode;
666 		} else {
667 			/* dir + name */
668 			inode = dir;
669 			hash = ceph_dentry_hash(req->r_dentry);
670 			is_hash = true;
671 		}
672 	}
673 
674 	dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
675 	     (int)hash, mode);
676 	if (!inode)
677 		goto random;
678 	ci = ceph_inode(inode);
679 
680 	if (is_hash && S_ISDIR(inode->i_mode)) {
681 		struct ceph_inode_frag frag;
682 		int found;
683 
684 		ceph_choose_frag(ci, hash, &frag, &found);
685 		if (found) {
686 			if (mode == USE_ANY_MDS && frag.ndist > 0) {
687 				u8 r;
688 
689 				/* choose a random replica */
690 				get_random_bytes(&r, 1);
691 				r %= frag.ndist;
692 				mds = frag.dist[r];
693 				dout("choose_mds %p %llx.%llx "
694 				     "frag %u mds%d (%d/%d)\n",
695 				     inode, ceph_vinop(inode),
696 				     frag.frag, mds,
697 				     (int)r, frag.ndist);
698 				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
699 				    CEPH_MDS_STATE_ACTIVE)
700 					return mds;
701 			}
702 
703 			/* since this file/dir wasn't known to be
704 			 * replicated, then we want to look for the
705 			 * authoritative mds. */
706 			mode = USE_AUTH_MDS;
707 			if (frag.mds >= 0) {
708 				/* choose auth mds */
709 				mds = frag.mds;
710 				dout("choose_mds %p %llx.%llx "
711 				     "frag %u mds%d (auth)\n",
712 				     inode, ceph_vinop(inode), frag.frag, mds);
713 				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
714 				    CEPH_MDS_STATE_ACTIVE)
715 					return mds;
716 			}
717 		}
718 	}
719 
720 	spin_lock(&inode->i_lock);
721 	cap = NULL;
722 	if (mode == USE_AUTH_MDS)
723 		cap = ci->i_auth_cap;
724 	if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
725 		cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
726 	if (!cap) {
727 		spin_unlock(&inode->i_lock);
728 		goto random;
729 	}
730 	mds = cap->session->s_mds;
731 	dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
732 	     inode, ceph_vinop(inode), mds,
733 	     cap == ci->i_auth_cap ? "auth " : "", cap);
734 	spin_unlock(&inode->i_lock);
735 	return mds;
736 
737 random:
738 	mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
739 	dout("choose_mds chose random mds%d\n", mds);
740 	return mds;
741 }
742 
743 
744 /*
745  * session messages
746  */
create_session_msg(u32 op,u64 seq)747 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
748 {
749 	struct ceph_msg *msg;
750 	struct ceph_mds_session_head *h;
751 
752 	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS);
753 	if (!msg) {
754 		pr_err("create_session_msg ENOMEM creating msg\n");
755 		return NULL;
756 	}
757 	h = msg->front.iov_base;
758 	h->op = cpu_to_le32(op);
759 	h->seq = cpu_to_le64(seq);
760 	return msg;
761 }
762 
763 /*
764  * send session open request.
765  *
766  * called under mdsc->mutex
767  */
__open_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)768 static int __open_session(struct ceph_mds_client *mdsc,
769 			  struct ceph_mds_session *session)
770 {
771 	struct ceph_msg *msg;
772 	int mstate;
773 	int mds = session->s_mds;
774 
775 	/* wait for mds to go active? */
776 	mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
777 	dout("open_session to mds%d (%s)\n", mds,
778 	     ceph_mds_state_name(mstate));
779 	session->s_state = CEPH_MDS_SESSION_OPENING;
780 	session->s_renew_requested = jiffies;
781 
782 	/* send connect message */
783 	msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
784 	if (!msg)
785 		return -ENOMEM;
786 	ceph_con_send(&session->s_con, msg);
787 	return 0;
788 }
789 
790 /*
791  * open sessions for any export targets for the given mds
792  *
793  * called under mdsc->mutex
794  */
__open_export_target_sessions(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)795 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
796 					  struct ceph_mds_session *session)
797 {
798 	struct ceph_mds_info *mi;
799 	struct ceph_mds_session *ts;
800 	int i, mds = session->s_mds;
801 	int target;
802 
803 	if (mds >= mdsc->mdsmap->m_max_mds)
804 		return;
805 	mi = &mdsc->mdsmap->m_info[mds];
806 	dout("open_export_target_sessions for mds%d (%d targets)\n",
807 	     session->s_mds, mi->num_export_targets);
808 
809 	for (i = 0; i < mi->num_export_targets; i++) {
810 		target = mi->export_targets[i];
811 		ts = __ceph_lookup_mds_session(mdsc, target);
812 		if (!ts) {
813 			ts = register_session(mdsc, target);
814 			if (IS_ERR(ts))
815 				return;
816 		}
817 		if (session->s_state == CEPH_MDS_SESSION_NEW ||
818 		    session->s_state == CEPH_MDS_SESSION_CLOSING)
819 			__open_session(mdsc, session);
820 		else
821 			dout(" mds%d target mds%d %p is %s\n", session->s_mds,
822 			     i, ts, session_state_name(ts->s_state));
823 		ceph_put_mds_session(ts);
824 	}
825 }
826 
ceph_mdsc_open_export_target_sessions(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)827 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
828 					   struct ceph_mds_session *session)
829 {
830 	mutex_lock(&mdsc->mutex);
831 	__open_export_target_sessions(mdsc, session);
832 	mutex_unlock(&mdsc->mutex);
833 }
834 
835 /*
836  * session caps
837  */
838 
839 /*
840  * Free preallocated cap messages assigned to this session
841  */
cleanup_cap_releases(struct ceph_mds_session * session)842 static void cleanup_cap_releases(struct ceph_mds_session *session)
843 {
844 	struct ceph_msg *msg;
845 
846 	spin_lock(&session->s_cap_lock);
847 	while (!list_empty(&session->s_cap_releases)) {
848 		msg = list_first_entry(&session->s_cap_releases,
849 				       struct ceph_msg, list_head);
850 		list_del_init(&msg->list_head);
851 		ceph_msg_put(msg);
852 	}
853 	while (!list_empty(&session->s_cap_releases_done)) {
854 		msg = list_first_entry(&session->s_cap_releases_done,
855 				       struct ceph_msg, list_head);
856 		list_del_init(&msg->list_head);
857 		ceph_msg_put(msg);
858 	}
859 	spin_unlock(&session->s_cap_lock);
860 }
861 
862 /*
863  * Helper to safely iterate over all caps associated with a session, with
864  * special care taken to handle a racing __ceph_remove_cap().
865  *
866  * Caller must hold session s_mutex.
867  */
iterate_session_caps(struct ceph_mds_session * session,int (* cb)(struct inode *,struct ceph_cap *,void *),void * arg)868 static int iterate_session_caps(struct ceph_mds_session *session,
869 				 int (*cb)(struct inode *, struct ceph_cap *,
870 					    void *), void *arg)
871 {
872 	struct list_head *p;
873 	struct ceph_cap *cap;
874 	struct inode *inode, *last_inode = NULL;
875 	struct ceph_cap *old_cap = NULL;
876 	int ret;
877 
878 	dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
879 	spin_lock(&session->s_cap_lock);
880 	p = session->s_caps.next;
881 	while (p != &session->s_caps) {
882 		cap = list_entry(p, struct ceph_cap, session_caps);
883 		inode = igrab(&cap->ci->vfs_inode);
884 		if (!inode) {
885 			p = p->next;
886 			continue;
887 		}
888 		session->s_cap_iterator = cap;
889 		spin_unlock(&session->s_cap_lock);
890 
891 		if (last_inode) {
892 			iput(last_inode);
893 			last_inode = NULL;
894 		}
895 		if (old_cap) {
896 			ceph_put_cap(session->s_mdsc, old_cap);
897 			old_cap = NULL;
898 		}
899 
900 		ret = cb(inode, cap, arg);
901 		last_inode = inode;
902 
903 		spin_lock(&session->s_cap_lock);
904 		p = p->next;
905 		if (cap->ci == NULL) {
906 			dout("iterate_session_caps  finishing cap %p removal\n",
907 			     cap);
908 			BUG_ON(cap->session != session);
909 			list_del_init(&cap->session_caps);
910 			session->s_nr_caps--;
911 			cap->session = NULL;
912 			old_cap = cap;  /* put_cap it w/o locks held */
913 		}
914 		if (ret < 0)
915 			goto out;
916 	}
917 	ret = 0;
918 out:
919 	session->s_cap_iterator = NULL;
920 	spin_unlock(&session->s_cap_lock);
921 
922 	if (last_inode)
923 		iput(last_inode);
924 	if (old_cap)
925 		ceph_put_cap(session->s_mdsc, old_cap);
926 
927 	return ret;
928 }
929 
remove_session_caps_cb(struct inode * inode,struct ceph_cap * cap,void * arg)930 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
931 				  void *arg)
932 {
933 	struct ceph_inode_info *ci = ceph_inode(inode);
934 	int drop = 0;
935 
936 	dout("removing cap %p, ci is %p, inode is %p\n",
937 	     cap, ci, &ci->vfs_inode);
938 	spin_lock(&inode->i_lock);
939 	__ceph_remove_cap(cap);
940 	if (!__ceph_is_any_real_caps(ci)) {
941 		struct ceph_mds_client *mdsc =
942 			ceph_sb_to_client(inode->i_sb)->mdsc;
943 
944 		spin_lock(&mdsc->cap_dirty_lock);
945 		if (!list_empty(&ci->i_dirty_item)) {
946 			pr_info(" dropping dirty %s state for %p %lld\n",
947 				ceph_cap_string(ci->i_dirty_caps),
948 				inode, ceph_ino(inode));
949 			ci->i_dirty_caps = 0;
950 			list_del_init(&ci->i_dirty_item);
951 			drop = 1;
952 		}
953 		if (!list_empty(&ci->i_flushing_item)) {
954 			pr_info(" dropping dirty+flushing %s state for %p %lld\n",
955 				ceph_cap_string(ci->i_flushing_caps),
956 				inode, ceph_ino(inode));
957 			ci->i_flushing_caps = 0;
958 			list_del_init(&ci->i_flushing_item);
959 			mdsc->num_cap_flushing--;
960 			drop = 1;
961 		}
962 		if (drop && ci->i_wrbuffer_ref) {
963 			pr_info(" dropping dirty data for %p %lld\n",
964 				inode, ceph_ino(inode));
965 			ci->i_wrbuffer_ref = 0;
966 			ci->i_wrbuffer_ref_head = 0;
967 			drop++;
968 		}
969 		spin_unlock(&mdsc->cap_dirty_lock);
970 	}
971 	spin_unlock(&inode->i_lock);
972 	while (drop--)
973 		iput(inode);
974 	return 0;
975 }
976 
977 /*
978  * caller must hold session s_mutex
979  */
remove_session_caps(struct ceph_mds_session * session)980 static void remove_session_caps(struct ceph_mds_session *session)
981 {
982 	dout("remove_session_caps on %p\n", session);
983 	iterate_session_caps(session, remove_session_caps_cb, NULL);
984 	BUG_ON(session->s_nr_caps > 0);
985 	BUG_ON(!list_empty(&session->s_cap_flushing));
986 	cleanup_cap_releases(session);
987 }
988 
989 /*
990  * wake up any threads waiting on this session's caps.  if the cap is
991  * old (didn't get renewed on the client reconnect), remove it now.
992  *
993  * caller must hold s_mutex.
994  */
wake_up_session_cb(struct inode * inode,struct ceph_cap * cap,void * arg)995 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
996 			      void *arg)
997 {
998 	struct ceph_inode_info *ci = ceph_inode(inode);
999 
1000 	wake_up_all(&ci->i_cap_wq);
1001 	if (arg) {
1002 		spin_lock(&inode->i_lock);
1003 		ci->i_wanted_max_size = 0;
1004 		ci->i_requested_max_size = 0;
1005 		spin_unlock(&inode->i_lock);
1006 	}
1007 	return 0;
1008 }
1009 
wake_up_session_caps(struct ceph_mds_session * session,int reconnect)1010 static void wake_up_session_caps(struct ceph_mds_session *session,
1011 				 int reconnect)
1012 {
1013 	dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1014 	iterate_session_caps(session, wake_up_session_cb,
1015 			     (void *)(unsigned long)reconnect);
1016 }
1017 
1018 /*
1019  * Send periodic message to MDS renewing all currently held caps.  The
1020  * ack will reset the expiration for all caps from this session.
1021  *
1022  * caller holds s_mutex
1023  */
send_renew_caps(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1024 static int send_renew_caps(struct ceph_mds_client *mdsc,
1025 			   struct ceph_mds_session *session)
1026 {
1027 	struct ceph_msg *msg;
1028 	int state;
1029 
1030 	if (time_after_eq(jiffies, session->s_cap_ttl) &&
1031 	    time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1032 		pr_info("mds%d caps stale\n", session->s_mds);
1033 	session->s_renew_requested = jiffies;
1034 
1035 	/* do not try to renew caps until a recovering mds has reconnected
1036 	 * with its clients. */
1037 	state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1038 	if (state < CEPH_MDS_STATE_RECONNECT) {
1039 		dout("send_renew_caps ignoring mds%d (%s)\n",
1040 		     session->s_mds, ceph_mds_state_name(state));
1041 		return 0;
1042 	}
1043 
1044 	dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1045 		ceph_mds_state_name(state));
1046 	msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1047 				 ++session->s_renew_seq);
1048 	if (!msg)
1049 		return -ENOMEM;
1050 	ceph_con_send(&session->s_con, msg);
1051 	return 0;
1052 }
1053 
1054 /*
1055  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1056  *
1057  * Called under session->s_mutex
1058  */
renewed_caps(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,int is_renew)1059 static void renewed_caps(struct ceph_mds_client *mdsc,
1060 			 struct ceph_mds_session *session, int is_renew)
1061 {
1062 	int was_stale;
1063 	int wake = 0;
1064 
1065 	spin_lock(&session->s_cap_lock);
1066 	was_stale = is_renew && (session->s_cap_ttl == 0 ||
1067 				 time_after_eq(jiffies, session->s_cap_ttl));
1068 
1069 	session->s_cap_ttl = session->s_renew_requested +
1070 		mdsc->mdsmap->m_session_timeout*HZ;
1071 
1072 	if (was_stale) {
1073 		if (time_before(jiffies, session->s_cap_ttl)) {
1074 			pr_info("mds%d caps renewed\n", session->s_mds);
1075 			wake = 1;
1076 		} else {
1077 			pr_info("mds%d caps still stale\n", session->s_mds);
1078 		}
1079 	}
1080 	dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1081 	     session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1082 	     time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1083 	spin_unlock(&session->s_cap_lock);
1084 
1085 	if (wake)
1086 		wake_up_session_caps(session, 0);
1087 }
1088 
1089 /*
1090  * send a session close request
1091  */
request_close_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1092 static int request_close_session(struct ceph_mds_client *mdsc,
1093 				 struct ceph_mds_session *session)
1094 {
1095 	struct ceph_msg *msg;
1096 
1097 	dout("request_close_session mds%d state %s seq %lld\n",
1098 	     session->s_mds, session_state_name(session->s_state),
1099 	     session->s_seq);
1100 	msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1101 	if (!msg)
1102 		return -ENOMEM;
1103 	ceph_con_send(&session->s_con, msg);
1104 	return 0;
1105 }
1106 
1107 /*
1108  * Called with s_mutex held.
1109  */
__close_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1110 static int __close_session(struct ceph_mds_client *mdsc,
1111 			 struct ceph_mds_session *session)
1112 {
1113 	if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1114 		return 0;
1115 	session->s_state = CEPH_MDS_SESSION_CLOSING;
1116 	return request_close_session(mdsc, session);
1117 }
1118 
1119 /*
1120  * Trim old(er) caps.
1121  *
1122  * Because we can't cache an inode without one or more caps, we do
1123  * this indirectly: if a cap is unused, we prune its aliases, at which
1124  * point the inode will hopefully get dropped to.
1125  *
1126  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1127  * memory pressure from the MDS, though, so it needn't be perfect.
1128  */
trim_caps_cb(struct inode * inode,struct ceph_cap * cap,void * arg)1129 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1130 {
1131 	struct ceph_mds_session *session = arg;
1132 	struct ceph_inode_info *ci = ceph_inode(inode);
1133 	int used, oissued, mine;
1134 
1135 	if (session->s_trim_caps <= 0)
1136 		return -1;
1137 
1138 	spin_lock(&inode->i_lock);
1139 	mine = cap->issued | cap->implemented;
1140 	used = __ceph_caps_used(ci);
1141 	oissued = __ceph_caps_issued_other(ci, cap);
1142 
1143 	dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
1144 	     inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1145 	     ceph_cap_string(used));
1146 	if (ci->i_dirty_caps)
1147 		goto out;   /* dirty caps */
1148 	if ((used & ~oissued) & mine)
1149 		goto out;   /* we need these caps */
1150 
1151 	session->s_trim_caps--;
1152 	if (oissued) {
1153 		/* we aren't the only cap.. just remove us */
1154 		__ceph_remove_cap(cap);
1155 	} else {
1156 		/* try to drop referring dentries */
1157 		spin_unlock(&inode->i_lock);
1158 		d_prune_aliases(inode);
1159 		dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
1160 		     inode, cap, atomic_read(&inode->i_count));
1161 		return 0;
1162 	}
1163 
1164 out:
1165 	spin_unlock(&inode->i_lock);
1166 	return 0;
1167 }
1168 
1169 /*
1170  * Trim session cap count down to some max number.
1171  */
trim_caps(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,int max_caps)1172 static int trim_caps(struct ceph_mds_client *mdsc,
1173 		     struct ceph_mds_session *session,
1174 		     int max_caps)
1175 {
1176 	int trim_caps = session->s_nr_caps - max_caps;
1177 
1178 	dout("trim_caps mds%d start: %d / %d, trim %d\n",
1179 	     session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1180 	if (trim_caps > 0) {
1181 		session->s_trim_caps = trim_caps;
1182 		iterate_session_caps(session, trim_caps_cb, session);
1183 		dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1184 		     session->s_mds, session->s_nr_caps, max_caps,
1185 			trim_caps - session->s_trim_caps);
1186 		session->s_trim_caps = 0;
1187 	}
1188 	return 0;
1189 }
1190 
1191 /*
1192  * Allocate cap_release messages.  If there is a partially full message
1193  * in the queue, try to allocate enough to cover it's remainder, so that
1194  * we can send it immediately.
1195  *
1196  * Called under s_mutex.
1197  */
ceph_add_cap_releases(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1198 int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
1199 			  struct ceph_mds_session *session)
1200 {
1201 	struct ceph_msg *msg, *partial = NULL;
1202 	struct ceph_mds_cap_release *head;
1203 	int err = -ENOMEM;
1204 	int extra = mdsc->fsc->mount_options->cap_release_safety;
1205 	int num;
1206 
1207 	dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
1208 	     extra);
1209 
1210 	spin_lock(&session->s_cap_lock);
1211 
1212 	if (!list_empty(&session->s_cap_releases)) {
1213 		msg = list_first_entry(&session->s_cap_releases,
1214 				       struct ceph_msg,
1215 				 list_head);
1216 		head = msg->front.iov_base;
1217 		num = le32_to_cpu(head->num);
1218 		if (num) {
1219 			dout(" partial %p with (%d/%d)\n", msg, num,
1220 			     (int)CEPH_CAPS_PER_RELEASE);
1221 			extra += CEPH_CAPS_PER_RELEASE - num;
1222 			partial = msg;
1223 		}
1224 	}
1225 	while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1226 		spin_unlock(&session->s_cap_lock);
1227 		msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
1228 				   GFP_NOFS);
1229 		if (!msg)
1230 			goto out_unlocked;
1231 		dout("add_cap_releases %p msg %p now %d\n", session, msg,
1232 		     (int)msg->front.iov_len);
1233 		head = msg->front.iov_base;
1234 		head->num = cpu_to_le32(0);
1235 		msg->front.iov_len = sizeof(*head);
1236 		spin_lock(&session->s_cap_lock);
1237 		list_add(&msg->list_head, &session->s_cap_releases);
1238 		session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
1239 	}
1240 
1241 	if (partial) {
1242 		head = partial->front.iov_base;
1243 		num = le32_to_cpu(head->num);
1244 		dout(" queueing partial %p with %d/%d\n", partial, num,
1245 		     (int)CEPH_CAPS_PER_RELEASE);
1246 		list_move_tail(&partial->list_head,
1247 			       &session->s_cap_releases_done);
1248 		session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
1249 	}
1250 	err = 0;
1251 	spin_unlock(&session->s_cap_lock);
1252 out_unlocked:
1253 	return err;
1254 }
1255 
1256 /*
1257  * flush all dirty inode data to disk.
1258  *
1259  * returns true if we've flushed through want_flush_seq
1260  */
check_cap_flush(struct ceph_mds_client * mdsc,u64 want_flush_seq)1261 static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1262 {
1263 	int mds, ret = 1;
1264 
1265 	dout("check_cap_flush want %lld\n", want_flush_seq);
1266 	mutex_lock(&mdsc->mutex);
1267 	for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1268 		struct ceph_mds_session *session = mdsc->sessions[mds];
1269 
1270 		if (!session)
1271 			continue;
1272 		get_session(session);
1273 		mutex_unlock(&mdsc->mutex);
1274 
1275 		mutex_lock(&session->s_mutex);
1276 		if (!list_empty(&session->s_cap_flushing)) {
1277 			struct ceph_inode_info *ci =
1278 				list_entry(session->s_cap_flushing.next,
1279 					   struct ceph_inode_info,
1280 					   i_flushing_item);
1281 			struct inode *inode = &ci->vfs_inode;
1282 
1283 			spin_lock(&inode->i_lock);
1284 			if (ci->i_cap_flush_seq <= want_flush_seq) {
1285 				dout("check_cap_flush still flushing %p "
1286 				     "seq %lld <= %lld to mds%d\n", inode,
1287 				     ci->i_cap_flush_seq, want_flush_seq,
1288 				     session->s_mds);
1289 				ret = 0;
1290 			}
1291 			spin_unlock(&inode->i_lock);
1292 		}
1293 		mutex_unlock(&session->s_mutex);
1294 		ceph_put_mds_session(session);
1295 
1296 		if (!ret)
1297 			return ret;
1298 		mutex_lock(&mdsc->mutex);
1299 	}
1300 
1301 	mutex_unlock(&mdsc->mutex);
1302 	dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1303 	return ret;
1304 }
1305 
1306 /*
1307  * called under s_mutex
1308  */
ceph_send_cap_releases(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1309 void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1310 			    struct ceph_mds_session *session)
1311 {
1312 	struct ceph_msg *msg;
1313 
1314 	dout("send_cap_releases mds%d\n", session->s_mds);
1315 	spin_lock(&session->s_cap_lock);
1316 	while (!list_empty(&session->s_cap_releases_done)) {
1317 		msg = list_first_entry(&session->s_cap_releases_done,
1318 				 struct ceph_msg, list_head);
1319 		list_del_init(&msg->list_head);
1320 		spin_unlock(&session->s_cap_lock);
1321 		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1322 		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1323 		ceph_con_send(&session->s_con, msg);
1324 		spin_lock(&session->s_cap_lock);
1325 	}
1326 	spin_unlock(&session->s_cap_lock);
1327 }
1328 
discard_cap_releases(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1329 static void discard_cap_releases(struct ceph_mds_client *mdsc,
1330 				 struct ceph_mds_session *session)
1331 {
1332 	struct ceph_msg *msg;
1333 	struct ceph_mds_cap_release *head;
1334 	unsigned num;
1335 
1336 	dout("discard_cap_releases mds%d\n", session->s_mds);
1337 	spin_lock(&session->s_cap_lock);
1338 
1339 	/* zero out the in-progress message */
1340 	msg = list_first_entry(&session->s_cap_releases,
1341 			       struct ceph_msg, list_head);
1342 	head = msg->front.iov_base;
1343 	num = le32_to_cpu(head->num);
1344 	dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
1345 	head->num = cpu_to_le32(0);
1346 	session->s_num_cap_releases += num;
1347 
1348 	/* requeue completed messages */
1349 	while (!list_empty(&session->s_cap_releases_done)) {
1350 		msg = list_first_entry(&session->s_cap_releases_done,
1351 				 struct ceph_msg, list_head);
1352 		list_del_init(&msg->list_head);
1353 
1354 		head = msg->front.iov_base;
1355 		num = le32_to_cpu(head->num);
1356 		dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
1357 		     num);
1358 		session->s_num_cap_releases += num;
1359 		head->num = cpu_to_le32(0);
1360 		msg->front.iov_len = sizeof(*head);
1361 		list_add(&msg->list_head, &session->s_cap_releases);
1362 	}
1363 
1364 	spin_unlock(&session->s_cap_lock);
1365 }
1366 
1367 /*
1368  * requests
1369  */
1370 
1371 /*
1372  * Create an mds request.
1373  */
1374 struct ceph_mds_request *
ceph_mdsc_create_request(struct ceph_mds_client * mdsc,int op,int mode)1375 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1376 {
1377 	struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1378 
1379 	if (!req)
1380 		return ERR_PTR(-ENOMEM);
1381 
1382 	mutex_init(&req->r_fill_mutex);
1383 	req->r_mdsc = mdsc;
1384 	req->r_started = jiffies;
1385 	req->r_resend_mds = -1;
1386 	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1387 	req->r_fmode = -1;
1388 	kref_init(&req->r_kref);
1389 	INIT_LIST_HEAD(&req->r_wait);
1390 	init_completion(&req->r_completion);
1391 	init_completion(&req->r_safe_completion);
1392 	INIT_LIST_HEAD(&req->r_unsafe_item);
1393 
1394 	req->r_op = op;
1395 	req->r_direct_mode = mode;
1396 	return req;
1397 }
1398 
1399 /*
1400  * return oldest (lowest) request, tid in request tree, 0 if none.
1401  *
1402  * called under mdsc->mutex.
1403  */
__get_oldest_req(struct ceph_mds_client * mdsc)1404 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1405 {
1406 	if (RB_EMPTY_ROOT(&mdsc->request_tree))
1407 		return NULL;
1408 	return rb_entry(rb_first(&mdsc->request_tree),
1409 			struct ceph_mds_request, r_node);
1410 }
1411 
__get_oldest_tid(struct ceph_mds_client * mdsc)1412 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1413 {
1414 	struct ceph_mds_request *req = __get_oldest_req(mdsc);
1415 
1416 	if (req)
1417 		return req->r_tid;
1418 	return 0;
1419 }
1420 
1421 /*
1422  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1423  * on build_path_from_dentry in fs/cifs/dir.c.
1424  *
1425  * If @stop_on_nosnap, generate path relative to the first non-snapped
1426  * inode.
1427  *
1428  * Encode hidden .snap dirs as a double /, i.e.
1429  *   foo/.snap/bar -> foo//bar
1430  */
ceph_mdsc_build_path(struct dentry * dentry,int * plen,u64 * base,int stop_on_nosnap)1431 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1432 			   int stop_on_nosnap)
1433 {
1434 	struct dentry *temp;
1435 	char *path;
1436 	int len, pos;
1437 
1438 	if (dentry == NULL)
1439 		return ERR_PTR(-EINVAL);
1440 
1441 retry:
1442 	len = 0;
1443 	for (temp = dentry; !IS_ROOT(temp);) {
1444 		struct inode *inode = temp->d_inode;
1445 		if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1446 			len++;  /* slash only */
1447 		else if (stop_on_nosnap && inode &&
1448 			 ceph_snap(inode) == CEPH_NOSNAP)
1449 			break;
1450 		else
1451 			len += 1 + temp->d_name.len;
1452 		temp = temp->d_parent;
1453 		if (temp == NULL) {
1454 			pr_err("build_path corrupt dentry %p\n", dentry);
1455 			return ERR_PTR(-EINVAL);
1456 		}
1457 	}
1458 	if (len)
1459 		len--;  /* no leading '/' */
1460 
1461 	path = kmalloc(len+1, GFP_NOFS);
1462 	if (path == NULL)
1463 		return ERR_PTR(-ENOMEM);
1464 	pos = len;
1465 	path[pos] = 0;	/* trailing null */
1466 	for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1467 		struct inode *inode = temp->d_inode;
1468 
1469 		if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1470 			dout("build_path path+%d: %p SNAPDIR\n",
1471 			     pos, temp);
1472 		} else if (stop_on_nosnap && inode &&
1473 			   ceph_snap(inode) == CEPH_NOSNAP) {
1474 			break;
1475 		} else {
1476 			pos -= temp->d_name.len;
1477 			if (pos < 0)
1478 				break;
1479 			strncpy(path + pos, temp->d_name.name,
1480 				temp->d_name.len);
1481 		}
1482 		if (pos)
1483 			path[--pos] = '/';
1484 		temp = temp->d_parent;
1485 		if (temp == NULL) {
1486 			pr_err("build_path corrupt dentry\n");
1487 			kfree(path);
1488 			return ERR_PTR(-EINVAL);
1489 		}
1490 	}
1491 	if (pos != 0) {
1492 		pr_err("build_path did not end path lookup where "
1493 		       "expected, namelen is %d, pos is %d\n", len, pos);
1494 		/* presumably this is only possible if racing with a
1495 		   rename of one of the parent directories (we can not
1496 		   lock the dentries above us to prevent this, but
1497 		   retrying should be harmless) */
1498 		kfree(path);
1499 		goto retry;
1500 	}
1501 
1502 	*base = ceph_ino(temp->d_inode);
1503 	*plen = len;
1504 	dout("build_path on %p %d built %llx '%.*s'\n",
1505 	     dentry, dentry->d_count, *base, len, path);
1506 	return path;
1507 }
1508 
build_dentry_path(struct dentry * dentry,const char ** ppath,int * ppathlen,u64 * pino,int * pfreepath)1509 static int build_dentry_path(struct dentry *dentry,
1510 			     const char **ppath, int *ppathlen, u64 *pino,
1511 			     int *pfreepath)
1512 {
1513 	char *path;
1514 
1515 	if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1516 		*pino = ceph_ino(dentry->d_parent->d_inode);
1517 		*ppath = dentry->d_name.name;
1518 		*ppathlen = dentry->d_name.len;
1519 		return 0;
1520 	}
1521 	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1522 	if (IS_ERR(path))
1523 		return PTR_ERR(path);
1524 	*ppath = path;
1525 	*pfreepath = 1;
1526 	return 0;
1527 }
1528 
build_inode_path(struct inode * inode,const char ** ppath,int * ppathlen,u64 * pino,int * pfreepath)1529 static int build_inode_path(struct inode *inode,
1530 			    const char **ppath, int *ppathlen, u64 *pino,
1531 			    int *pfreepath)
1532 {
1533 	struct dentry *dentry;
1534 	char *path;
1535 
1536 	if (ceph_snap(inode) == CEPH_NOSNAP) {
1537 		*pino = ceph_ino(inode);
1538 		*ppathlen = 0;
1539 		return 0;
1540 	}
1541 	dentry = d_find_alias(inode);
1542 	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1543 	dput(dentry);
1544 	if (IS_ERR(path))
1545 		return PTR_ERR(path);
1546 	*ppath = path;
1547 	*pfreepath = 1;
1548 	return 0;
1549 }
1550 
1551 /*
1552  * request arguments may be specified via an inode *, a dentry *, or
1553  * an explicit ino+path.
1554  */
set_request_path_attr(struct inode * rinode,struct dentry * rdentry,const char * rpath,u64 rino,const char ** ppath,int * pathlen,u64 * ino,int * freepath)1555 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1556 				  const char *rpath, u64 rino,
1557 				  const char **ppath, int *pathlen,
1558 				  u64 *ino, int *freepath)
1559 {
1560 	int r = 0;
1561 
1562 	if (rinode) {
1563 		r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1564 		dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1565 		     ceph_snap(rinode));
1566 	} else if (rdentry) {
1567 		r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1568 		dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1569 		     *ppath);
1570 	} else if (rpath) {
1571 		*ino = rino;
1572 		*ppath = rpath;
1573 		*pathlen = strlen(rpath);
1574 		dout(" path %.*s\n", *pathlen, rpath);
1575 	}
1576 
1577 	return r;
1578 }
1579 
1580 /*
1581  * called under mdsc->mutex
1582  */
create_request_message(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,int mds)1583 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1584 					       struct ceph_mds_request *req,
1585 					       int mds)
1586 {
1587 	struct ceph_msg *msg;
1588 	struct ceph_mds_request_head *head;
1589 	const char *path1 = NULL;
1590 	const char *path2 = NULL;
1591 	u64 ino1 = 0, ino2 = 0;
1592 	int pathlen1 = 0, pathlen2 = 0;
1593 	int freepath1 = 0, freepath2 = 0;
1594 	int len;
1595 	u16 releases;
1596 	void *p, *end;
1597 	int ret;
1598 
1599 	ret = set_request_path_attr(req->r_inode, req->r_dentry,
1600 			      req->r_path1, req->r_ino1.ino,
1601 			      &path1, &pathlen1, &ino1, &freepath1);
1602 	if (ret < 0) {
1603 		msg = ERR_PTR(ret);
1604 		goto out;
1605 	}
1606 
1607 	ret = set_request_path_attr(NULL, req->r_old_dentry,
1608 			      req->r_path2, req->r_ino2.ino,
1609 			      &path2, &pathlen2, &ino2, &freepath2);
1610 	if (ret < 0) {
1611 		msg = ERR_PTR(ret);
1612 		goto out_free1;
1613 	}
1614 
1615 	len = sizeof(*head) +
1616 		pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
1617 
1618 	/* calculate (max) length for cap releases */
1619 	len += sizeof(struct ceph_mds_request_release) *
1620 		(!!req->r_inode_drop + !!req->r_dentry_drop +
1621 		 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1622 	if (req->r_dentry_drop)
1623 		len += req->r_dentry->d_name.len;
1624 	if (req->r_old_dentry_drop)
1625 		len += req->r_old_dentry->d_name.len;
1626 
1627 	msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS);
1628 	if (!msg) {
1629 		msg = ERR_PTR(-ENOMEM);
1630 		goto out_free2;
1631 	}
1632 
1633 	msg->hdr.tid = cpu_to_le64(req->r_tid);
1634 
1635 	head = msg->front.iov_base;
1636 	p = msg->front.iov_base + sizeof(*head);
1637 	end = msg->front.iov_base + msg->front.iov_len;
1638 
1639 	head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1640 	head->op = cpu_to_le32(req->r_op);
1641 	head->caller_uid = cpu_to_le32(req->r_uid);
1642 	head->caller_gid = cpu_to_le32(req->r_gid);
1643 	head->args = req->r_args;
1644 
1645 	ceph_encode_filepath(&p, end, ino1, path1);
1646 	ceph_encode_filepath(&p, end, ino2, path2);
1647 
1648 	/* make note of release offset, in case we need to replay */
1649 	req->r_request_release_offset = p - msg->front.iov_base;
1650 
1651 	/* cap releases */
1652 	releases = 0;
1653 	if (req->r_inode_drop)
1654 		releases += ceph_encode_inode_release(&p,
1655 		      req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1656 		      mds, req->r_inode_drop, req->r_inode_unless, 0);
1657 	if (req->r_dentry_drop)
1658 		releases += ceph_encode_dentry_release(&p, req->r_dentry,
1659 		       mds, req->r_dentry_drop, req->r_dentry_unless);
1660 	if (req->r_old_dentry_drop)
1661 		releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1662 		       mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1663 	if (req->r_old_inode_drop)
1664 		releases += ceph_encode_inode_release(&p,
1665 		      req->r_old_dentry->d_inode,
1666 		      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1667 	head->num_releases = cpu_to_le16(releases);
1668 
1669 	BUG_ON(p > end);
1670 	msg->front.iov_len = p - msg->front.iov_base;
1671 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1672 
1673 	msg->pages = req->r_pages;
1674 	msg->nr_pages = req->r_num_pages;
1675 	msg->hdr.data_len = cpu_to_le32(req->r_data_len);
1676 	msg->hdr.data_off = cpu_to_le16(0);
1677 
1678 out_free2:
1679 	if (freepath2)
1680 		kfree((char *)path2);
1681 out_free1:
1682 	if (freepath1)
1683 		kfree((char *)path1);
1684 out:
1685 	return msg;
1686 }
1687 
1688 /*
1689  * called under mdsc->mutex if error, under no mutex if
1690  * success.
1691  */
complete_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)1692 static void complete_request(struct ceph_mds_client *mdsc,
1693 			     struct ceph_mds_request *req)
1694 {
1695 	if (req->r_callback)
1696 		req->r_callback(mdsc, req);
1697 	else
1698 		complete_all(&req->r_completion);
1699 }
1700 
1701 /*
1702  * called under mdsc->mutex
1703  */
__prepare_send_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,int mds)1704 static int __prepare_send_request(struct ceph_mds_client *mdsc,
1705 				  struct ceph_mds_request *req,
1706 				  int mds)
1707 {
1708 	struct ceph_mds_request_head *rhead;
1709 	struct ceph_msg *msg;
1710 	int flags = 0;
1711 
1712 	req->r_attempts++;
1713 	if (req->r_inode) {
1714 		struct ceph_cap *cap =
1715 			ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
1716 
1717 		if (cap)
1718 			req->r_sent_on_mseq = cap->mseq;
1719 		else
1720 			req->r_sent_on_mseq = -1;
1721 	}
1722 	dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1723 	     req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1724 
1725 	if (req->r_got_unsafe) {
1726 		/*
1727 		 * Replay.  Do not regenerate message (and rebuild
1728 		 * paths, etc.); just use the original message.
1729 		 * Rebuilding paths will break for renames because
1730 		 * d_move mangles the src name.
1731 		 */
1732 		msg = req->r_request;
1733 		rhead = msg->front.iov_base;
1734 
1735 		flags = le32_to_cpu(rhead->flags);
1736 		flags |= CEPH_MDS_FLAG_REPLAY;
1737 		rhead->flags = cpu_to_le32(flags);
1738 
1739 		if (req->r_target_inode)
1740 			rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1741 
1742 		rhead->num_retry = req->r_attempts - 1;
1743 
1744 		/* remove cap/dentry releases from message */
1745 		rhead->num_releases = 0;
1746 		msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset);
1747 		msg->front.iov_len = req->r_request_release_offset;
1748 		return 0;
1749 	}
1750 
1751 	if (req->r_request) {
1752 		ceph_msg_put(req->r_request);
1753 		req->r_request = NULL;
1754 	}
1755 	msg = create_request_message(mdsc, req, mds);
1756 	if (IS_ERR(msg)) {
1757 		req->r_err = PTR_ERR(msg);
1758 		complete_request(mdsc, req);
1759 		return PTR_ERR(msg);
1760 	}
1761 	req->r_request = msg;
1762 
1763 	rhead = msg->front.iov_base;
1764 	rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1765 	if (req->r_got_unsafe)
1766 		flags |= CEPH_MDS_FLAG_REPLAY;
1767 	if (req->r_locked_dir)
1768 		flags |= CEPH_MDS_FLAG_WANT_DENTRY;
1769 	rhead->flags = cpu_to_le32(flags);
1770 	rhead->num_fwd = req->r_num_fwd;
1771 	rhead->num_retry = req->r_attempts - 1;
1772 	rhead->ino = 0;
1773 
1774 	dout(" r_locked_dir = %p\n", req->r_locked_dir);
1775 	return 0;
1776 }
1777 
1778 /*
1779  * send request, or put it on the appropriate wait list.
1780  */
__do_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)1781 static int __do_request(struct ceph_mds_client *mdsc,
1782 			struct ceph_mds_request *req)
1783 {
1784 	struct ceph_mds_session *session = NULL;
1785 	int mds = -1;
1786 	int err = -EAGAIN;
1787 
1788 	if (req->r_err || req->r_got_result)
1789 		goto out;
1790 
1791 	if (req->r_timeout &&
1792 	    time_after_eq(jiffies, req->r_started + req->r_timeout)) {
1793 		dout("do_request timed out\n");
1794 		err = -EIO;
1795 		goto finish;
1796 	}
1797 
1798 	put_request_session(req);
1799 
1800 	mds = __choose_mds(mdsc, req);
1801 	if (mds < 0 ||
1802 	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
1803 		dout("do_request no mds or not active, waiting for map\n");
1804 		list_add(&req->r_wait, &mdsc->waiting_for_map);
1805 		goto out;
1806 	}
1807 
1808 	/* get, open session */
1809 	session = __ceph_lookup_mds_session(mdsc, mds);
1810 	if (!session) {
1811 		session = register_session(mdsc, mds);
1812 		if (IS_ERR(session)) {
1813 			err = PTR_ERR(session);
1814 			goto finish;
1815 		}
1816 	}
1817 	req->r_session = get_session(session);
1818 
1819 	dout("do_request mds%d session %p state %s\n", mds, session,
1820 	     session_state_name(session->s_state));
1821 	if (session->s_state != CEPH_MDS_SESSION_OPEN &&
1822 	    session->s_state != CEPH_MDS_SESSION_HUNG) {
1823 		if (session->s_state == CEPH_MDS_SESSION_NEW ||
1824 		    session->s_state == CEPH_MDS_SESSION_CLOSING)
1825 			__open_session(mdsc, session);
1826 		list_add(&req->r_wait, &session->s_waiting);
1827 		goto out_session;
1828 	}
1829 
1830 	/* send request */
1831 	req->r_resend_mds = -1;   /* forget any previous mds hint */
1832 
1833 	if (req->r_request_started == 0)   /* note request start time */
1834 		req->r_request_started = jiffies;
1835 
1836 	err = __prepare_send_request(mdsc, req, mds);
1837 	if (!err) {
1838 		ceph_msg_get(req->r_request);
1839 		ceph_con_send(&session->s_con, req->r_request);
1840 	}
1841 
1842 out_session:
1843 	ceph_put_mds_session(session);
1844 out:
1845 	return err;
1846 
1847 finish:
1848 	req->r_err = err;
1849 	complete_request(mdsc, req);
1850 	goto out;
1851 }
1852 
1853 /*
1854  * called under mdsc->mutex
1855  */
__wake_requests(struct ceph_mds_client * mdsc,struct list_head * head)1856 static void __wake_requests(struct ceph_mds_client *mdsc,
1857 			    struct list_head *head)
1858 {
1859 	struct ceph_mds_request *req, *nreq;
1860 
1861 	list_for_each_entry_safe(req, nreq, head, r_wait) {
1862 		list_del_init(&req->r_wait);
1863 		__do_request(mdsc, req);
1864 	}
1865 }
1866 
1867 /*
1868  * Wake up threads with requests pending for @mds, so that they can
1869  * resubmit their requests to a possibly different mds.
1870  */
kick_requests(struct ceph_mds_client * mdsc,int mds)1871 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
1872 {
1873 	struct ceph_mds_request *req;
1874 	struct rb_node *p;
1875 
1876 	dout("kick_requests mds%d\n", mds);
1877 	for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) {
1878 		req = rb_entry(p, struct ceph_mds_request, r_node);
1879 		if (req->r_got_unsafe)
1880 			continue;
1881 		if (req->r_session &&
1882 		    req->r_session->s_mds == mds) {
1883 			dout(" kicking tid %llu\n", req->r_tid);
1884 			__do_request(mdsc, req);
1885 		}
1886 	}
1887 }
1888 
ceph_mdsc_submit_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)1889 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
1890 			      struct ceph_mds_request *req)
1891 {
1892 	dout("submit_request on %p\n", req);
1893 	mutex_lock(&mdsc->mutex);
1894 	__register_request(mdsc, req, NULL);
1895 	__do_request(mdsc, req);
1896 	mutex_unlock(&mdsc->mutex);
1897 }
1898 
1899 /*
1900  * Synchrously perform an mds request.  Take care of all of the
1901  * session setup, forwarding, retry details.
1902  */
ceph_mdsc_do_request(struct ceph_mds_client * mdsc,struct inode * dir,struct ceph_mds_request * req)1903 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1904 			 struct inode *dir,
1905 			 struct ceph_mds_request *req)
1906 {
1907 	int err;
1908 
1909 	dout("do_request on %p\n", req);
1910 
1911 	/* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
1912 	if (req->r_inode)
1913 		ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1914 	if (req->r_locked_dir)
1915 		ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1916 	if (req->r_old_dentry)
1917 		ceph_get_cap_refs(
1918 			ceph_inode(req->r_old_dentry->d_parent->d_inode),
1919 			CEPH_CAP_PIN);
1920 
1921 	/* issue */
1922 	mutex_lock(&mdsc->mutex);
1923 	__register_request(mdsc, req, dir);
1924 	__do_request(mdsc, req);
1925 
1926 	if (req->r_err) {
1927 		err = req->r_err;
1928 		__unregister_request(mdsc, req);
1929 		dout("do_request early error %d\n", err);
1930 		goto out;
1931 	}
1932 
1933 	/* wait */
1934 	mutex_unlock(&mdsc->mutex);
1935 	dout("do_request waiting\n");
1936 	if (req->r_timeout) {
1937 		err = (long)wait_for_completion_killable_timeout(
1938 			&req->r_completion, req->r_timeout);
1939 		if (err == 0)
1940 			err = -EIO;
1941 	} else {
1942 		err = wait_for_completion_killable(&req->r_completion);
1943 	}
1944 	dout("do_request waited, got %d\n", err);
1945 	mutex_lock(&mdsc->mutex);
1946 
1947 	/* only abort if we didn't race with a real reply */
1948 	if (req->r_got_result) {
1949 		err = le32_to_cpu(req->r_reply_info.head->result);
1950 	} else if (err < 0) {
1951 		dout("aborted request %lld with %d\n", req->r_tid, err);
1952 
1953 		/*
1954 		 * ensure we aren't running concurrently with
1955 		 * ceph_fill_trace or ceph_readdir_prepopulate, which
1956 		 * rely on locks (dir mutex) held by our caller.
1957 		 */
1958 		mutex_lock(&req->r_fill_mutex);
1959 		req->r_err = err;
1960 		req->r_aborted = true;
1961 		mutex_unlock(&req->r_fill_mutex);
1962 
1963 		if (req->r_locked_dir &&
1964 		    (req->r_op & CEPH_MDS_OP_WRITE))
1965 			ceph_invalidate_dir_request(req);
1966 	} else {
1967 		err = req->r_err;
1968 	}
1969 
1970 out:
1971 	mutex_unlock(&mdsc->mutex);
1972 	dout("do_request %p done, result %d\n", req, err);
1973 	return err;
1974 }
1975 
1976 /*
1977  * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS
1978  * namespace request.
1979  */
ceph_invalidate_dir_request(struct ceph_mds_request * req)1980 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
1981 {
1982 	struct inode *inode = req->r_locked_dir;
1983 	struct ceph_inode_info *ci = ceph_inode(inode);
1984 
1985 	dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode);
1986 	spin_lock(&inode->i_lock);
1987 	ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
1988 	ci->i_release_count++;
1989 	spin_unlock(&inode->i_lock);
1990 
1991 	if (req->r_dentry)
1992 		ceph_invalidate_dentry_lease(req->r_dentry);
1993 	if (req->r_old_dentry)
1994 		ceph_invalidate_dentry_lease(req->r_old_dentry);
1995 }
1996 
1997 /*
1998  * Handle mds reply.
1999  *
2000  * We take the session mutex and parse and process the reply immediately.
2001  * This preserves the logical ordering of replies, capabilities, etc., sent
2002  * by the MDS as they are applied to our local cache.
2003  */
handle_reply(struct ceph_mds_session * session,struct ceph_msg * msg)2004 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2005 {
2006 	struct ceph_mds_client *mdsc = session->s_mdsc;
2007 	struct ceph_mds_request *req;
2008 	struct ceph_mds_reply_head *head = msg->front.iov_base;
2009 	struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2010 	u64 tid;
2011 	int err, result;
2012 	int mds = session->s_mds;
2013 
2014 	if (msg->front.iov_len < sizeof(*head)) {
2015 		pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2016 		ceph_msg_dump(msg);
2017 		return;
2018 	}
2019 
2020 	/* get request, session */
2021 	tid = le64_to_cpu(msg->hdr.tid);
2022 	mutex_lock(&mdsc->mutex);
2023 	req = __lookup_request(mdsc, tid);
2024 	if (!req) {
2025 		dout("handle_reply on unknown tid %llu\n", tid);
2026 		mutex_unlock(&mdsc->mutex);
2027 		return;
2028 	}
2029 	dout("handle_reply %p\n", req);
2030 
2031 	/* correct session? */
2032 	if (req->r_session != session) {
2033 		pr_err("mdsc_handle_reply got %llu on session mds%d"
2034 		       " not mds%d\n", tid, session->s_mds,
2035 		       req->r_session ? req->r_session->s_mds : -1);
2036 		mutex_unlock(&mdsc->mutex);
2037 		goto out;
2038 	}
2039 
2040 	/* dup? */
2041 	if ((req->r_got_unsafe && !head->safe) ||
2042 	    (req->r_got_safe && head->safe)) {
2043 		pr_warning("got a dup %s reply on %llu from mds%d\n",
2044 			   head->safe ? "safe" : "unsafe", tid, mds);
2045 		mutex_unlock(&mdsc->mutex);
2046 		goto out;
2047 	}
2048 	if (req->r_got_safe && !head->safe) {
2049 		pr_warning("got unsafe after safe on %llu from mds%d\n",
2050 			   tid, mds);
2051 		mutex_unlock(&mdsc->mutex);
2052 		goto out;
2053 	}
2054 
2055 	result = le32_to_cpu(head->result);
2056 
2057 	/*
2058 	 * Handle an ESTALE
2059 	 * if we're not talking to the authority, send to them
2060 	 * if the authority has changed while we weren't looking,
2061 	 * send to new authority
2062 	 * Otherwise we just have to return an ESTALE
2063 	 */
2064 	if (result == -ESTALE) {
2065 		dout("got ESTALE on request %llu", req->r_tid);
2066 		if (!req->r_inode) {
2067 			/* do nothing; not an authority problem */
2068 		} else if (req->r_direct_mode != USE_AUTH_MDS) {
2069 			dout("not using auth, setting for that now");
2070 			req->r_direct_mode = USE_AUTH_MDS;
2071 			__do_request(mdsc, req);
2072 			mutex_unlock(&mdsc->mutex);
2073 			goto out;
2074 		} else  {
2075 			struct ceph_inode_info *ci = ceph_inode(req->r_inode);
2076 			struct ceph_cap *cap = NULL;
2077 
2078 			if (req->r_session)
2079 				cap = ceph_get_cap_for_mds(ci,
2080 						   req->r_session->s_mds);
2081 
2082 			dout("already using auth");
2083 			if ((!cap || cap != ci->i_auth_cap) ||
2084 			    (cap->mseq != req->r_sent_on_mseq)) {
2085 				dout("but cap changed, so resending");
2086 				__do_request(mdsc, req);
2087 				mutex_unlock(&mdsc->mutex);
2088 				goto out;
2089 			}
2090 		}
2091 		dout("have to return ESTALE on request %llu", req->r_tid);
2092 	}
2093 
2094 
2095 	if (head->safe) {
2096 		req->r_got_safe = true;
2097 		__unregister_request(mdsc, req);
2098 		complete_all(&req->r_safe_completion);
2099 
2100 		if (req->r_got_unsafe) {
2101 			/*
2102 			 * We already handled the unsafe response, now do the
2103 			 * cleanup.  No need to examine the response; the MDS
2104 			 * doesn't include any result info in the safe
2105 			 * response.  And even if it did, there is nothing
2106 			 * useful we could do with a revised return value.
2107 			 */
2108 			dout("got safe reply %llu, mds%d\n", tid, mds);
2109 			list_del_init(&req->r_unsafe_item);
2110 
2111 			/* last unsafe request during umount? */
2112 			if (mdsc->stopping && !__get_oldest_req(mdsc))
2113 				complete_all(&mdsc->safe_umount_waiters);
2114 			mutex_unlock(&mdsc->mutex);
2115 			goto out;
2116 		}
2117 	} else {
2118 		req->r_got_unsafe = true;
2119 		list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2120 	}
2121 
2122 	dout("handle_reply tid %lld result %d\n", tid, result);
2123 	rinfo = &req->r_reply_info;
2124 	err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2125 	mutex_unlock(&mdsc->mutex);
2126 
2127 	mutex_lock(&session->s_mutex);
2128 	if (err < 0) {
2129 		pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2130 		ceph_msg_dump(msg);
2131 		goto out_err;
2132 	}
2133 
2134 	/* snap trace */
2135 	if (rinfo->snapblob_len) {
2136 		down_write(&mdsc->snap_rwsem);
2137 		ceph_update_snap_trace(mdsc, rinfo->snapblob,
2138 			       rinfo->snapblob + rinfo->snapblob_len,
2139 			       le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
2140 		downgrade_write(&mdsc->snap_rwsem);
2141 	} else {
2142 		down_read(&mdsc->snap_rwsem);
2143 	}
2144 
2145 	/* insert trace into our cache */
2146 	mutex_lock(&req->r_fill_mutex);
2147 	err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2148 	if (err == 0) {
2149 		if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK &&
2150 		    rinfo->dir_nr)
2151 			ceph_readdir_prepopulate(req, req->r_session);
2152 		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2153 	}
2154 	mutex_unlock(&req->r_fill_mutex);
2155 
2156 	up_read(&mdsc->snap_rwsem);
2157 out_err:
2158 	mutex_lock(&mdsc->mutex);
2159 	if (!req->r_aborted) {
2160 		if (err) {
2161 			req->r_err = err;
2162 		} else {
2163 			req->r_reply = msg;
2164 			ceph_msg_get(msg);
2165 			req->r_got_result = true;
2166 		}
2167 	} else {
2168 		dout("reply arrived after request %lld was aborted\n", tid);
2169 	}
2170 	mutex_unlock(&mdsc->mutex);
2171 
2172 	ceph_add_cap_releases(mdsc, req->r_session);
2173 	mutex_unlock(&session->s_mutex);
2174 
2175 	/* kick calling process */
2176 	complete_request(mdsc, req);
2177 out:
2178 	ceph_mdsc_put_request(req);
2179 	return;
2180 }
2181 
2182 
2183 
2184 /*
2185  * handle mds notification that our request has been forwarded.
2186  */
handle_forward(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,struct ceph_msg * msg)2187 static void handle_forward(struct ceph_mds_client *mdsc,
2188 			   struct ceph_mds_session *session,
2189 			   struct ceph_msg *msg)
2190 {
2191 	struct ceph_mds_request *req;
2192 	u64 tid = le64_to_cpu(msg->hdr.tid);
2193 	u32 next_mds;
2194 	u32 fwd_seq;
2195 	int err = -EINVAL;
2196 	void *p = msg->front.iov_base;
2197 	void *end = p + msg->front.iov_len;
2198 
2199 	ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2200 	next_mds = ceph_decode_32(&p);
2201 	fwd_seq = ceph_decode_32(&p);
2202 
2203 	mutex_lock(&mdsc->mutex);
2204 	req = __lookup_request(mdsc, tid);
2205 	if (!req) {
2206 		dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2207 		goto out;  /* dup reply? */
2208 	}
2209 
2210 	if (req->r_aborted) {
2211 		dout("forward tid %llu aborted, unregistering\n", tid);
2212 		__unregister_request(mdsc, req);
2213 	} else if (fwd_seq <= req->r_num_fwd) {
2214 		dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2215 		     tid, next_mds, req->r_num_fwd, fwd_seq);
2216 	} else {
2217 		/* resend. forward race not possible; mds would drop */
2218 		dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2219 		BUG_ON(req->r_err);
2220 		BUG_ON(req->r_got_result);
2221 		req->r_num_fwd = fwd_seq;
2222 		req->r_resend_mds = next_mds;
2223 		put_request_session(req);
2224 		__do_request(mdsc, req);
2225 	}
2226 	ceph_mdsc_put_request(req);
2227 out:
2228 	mutex_unlock(&mdsc->mutex);
2229 	return;
2230 
2231 bad:
2232 	pr_err("mdsc_handle_forward decode error err=%d\n", err);
2233 }
2234 
2235 /*
2236  * handle a mds session control message
2237  */
handle_session(struct ceph_mds_session * session,struct ceph_msg * msg)2238 static void handle_session(struct ceph_mds_session *session,
2239 			   struct ceph_msg *msg)
2240 {
2241 	struct ceph_mds_client *mdsc = session->s_mdsc;
2242 	u32 op;
2243 	u64 seq;
2244 	int mds = session->s_mds;
2245 	struct ceph_mds_session_head *h = msg->front.iov_base;
2246 	int wake = 0;
2247 
2248 	/* decode */
2249 	if (msg->front.iov_len != sizeof(*h))
2250 		goto bad;
2251 	op = le32_to_cpu(h->op);
2252 	seq = le64_to_cpu(h->seq);
2253 
2254 	mutex_lock(&mdsc->mutex);
2255 	if (op == CEPH_SESSION_CLOSE)
2256 		__unregister_session(mdsc, session);
2257 	/* FIXME: this ttl calculation is generous */
2258 	session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2259 	mutex_unlock(&mdsc->mutex);
2260 
2261 	mutex_lock(&session->s_mutex);
2262 
2263 	dout("handle_session mds%d %s %p state %s seq %llu\n",
2264 	     mds, ceph_session_op_name(op), session,
2265 	     session_state_name(session->s_state), seq);
2266 
2267 	if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2268 		session->s_state = CEPH_MDS_SESSION_OPEN;
2269 		pr_info("mds%d came back\n", session->s_mds);
2270 	}
2271 
2272 	switch (op) {
2273 	case CEPH_SESSION_OPEN:
2274 		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2275 			pr_info("mds%d reconnect success\n", session->s_mds);
2276 		session->s_state = CEPH_MDS_SESSION_OPEN;
2277 		renewed_caps(mdsc, session, 0);
2278 		wake = 1;
2279 		if (mdsc->stopping)
2280 			__close_session(mdsc, session);
2281 		break;
2282 
2283 	case CEPH_SESSION_RENEWCAPS:
2284 		if (session->s_renew_seq == seq)
2285 			renewed_caps(mdsc, session, 1);
2286 		break;
2287 
2288 	case CEPH_SESSION_CLOSE:
2289 		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2290 			pr_info("mds%d reconnect denied\n", session->s_mds);
2291 		remove_session_caps(session);
2292 		wake = 1; /* for good measure */
2293 		wake_up_all(&mdsc->session_close_wq);
2294 		kick_requests(mdsc, mds);
2295 		break;
2296 
2297 	case CEPH_SESSION_STALE:
2298 		pr_info("mds%d caps went stale, renewing\n",
2299 			session->s_mds);
2300 		spin_lock(&session->s_cap_lock);
2301 		session->s_cap_gen++;
2302 		session->s_cap_ttl = 0;
2303 		spin_unlock(&session->s_cap_lock);
2304 		send_renew_caps(mdsc, session);
2305 		break;
2306 
2307 	case CEPH_SESSION_RECALL_STATE:
2308 		trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2309 		break;
2310 
2311 	default:
2312 		pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2313 		WARN_ON(1);
2314 	}
2315 
2316 	mutex_unlock(&session->s_mutex);
2317 	if (wake) {
2318 		mutex_lock(&mdsc->mutex);
2319 		__wake_requests(mdsc, &session->s_waiting);
2320 		mutex_unlock(&mdsc->mutex);
2321 	}
2322 	return;
2323 
2324 bad:
2325 	pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2326 	       (int)msg->front.iov_len);
2327 	ceph_msg_dump(msg);
2328 	return;
2329 }
2330 
2331 
2332 /*
2333  * called under session->mutex.
2334  */
replay_unsafe_requests(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)2335 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2336 				   struct ceph_mds_session *session)
2337 {
2338 	struct ceph_mds_request *req, *nreq;
2339 	int err;
2340 
2341 	dout("replay_unsafe_requests mds%d\n", session->s_mds);
2342 
2343 	mutex_lock(&mdsc->mutex);
2344 	list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2345 		err = __prepare_send_request(mdsc, req, session->s_mds);
2346 		if (!err) {
2347 			ceph_msg_get(req->r_request);
2348 			ceph_con_send(&session->s_con, req->r_request);
2349 		}
2350 	}
2351 	mutex_unlock(&mdsc->mutex);
2352 }
2353 
2354 /*
2355  * Encode information about a cap for a reconnect with the MDS.
2356  */
encode_caps_cb(struct inode * inode,struct ceph_cap * cap,void * arg)2357 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2358 			  void *arg)
2359 {
2360 	union {
2361 		struct ceph_mds_cap_reconnect v2;
2362 		struct ceph_mds_cap_reconnect_v1 v1;
2363 	} rec;
2364 	size_t reclen;
2365 	struct ceph_inode_info *ci;
2366 	struct ceph_reconnect_state *recon_state = arg;
2367 	struct ceph_pagelist *pagelist = recon_state->pagelist;
2368 	char *path;
2369 	int pathlen, err;
2370 	u64 pathbase;
2371 	struct dentry *dentry;
2372 
2373 	ci = cap->ci;
2374 
2375 	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2376 	     inode, ceph_vinop(inode), cap, cap->cap_id,
2377 	     ceph_cap_string(cap->issued));
2378 	err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2379 	if (err)
2380 		return err;
2381 
2382 	dentry = d_find_alias(inode);
2383 	if (dentry) {
2384 		path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2385 		if (IS_ERR(path)) {
2386 			err = PTR_ERR(path);
2387 			goto out_dput;
2388 		}
2389 	} else {
2390 		path = NULL;
2391 		pathlen = 0;
2392 	}
2393 	err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2394 	if (err)
2395 		goto out_free;
2396 
2397 	spin_lock(&inode->i_lock);
2398 	cap->seq = 0;        /* reset cap seq */
2399 	cap->issue_seq = 0;  /* and issue_seq */
2400 
2401 	if (recon_state->flock) {
2402 		rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2403 		rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2404 		rec.v2.issued = cpu_to_le32(cap->issued);
2405 		rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2406 		rec.v2.pathbase = cpu_to_le64(pathbase);
2407 		rec.v2.flock_len = 0;
2408 		reclen = sizeof(rec.v2);
2409 	} else {
2410 		rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2411 		rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2412 		rec.v1.issued = cpu_to_le32(cap->issued);
2413 		rec.v1.size = cpu_to_le64(inode->i_size);
2414 		ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2415 		ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2416 		rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2417 		rec.v1.pathbase = cpu_to_le64(pathbase);
2418 		reclen = sizeof(rec.v1);
2419 	}
2420 	spin_unlock(&inode->i_lock);
2421 
2422 	if (recon_state->flock) {
2423 		int num_fcntl_locks, num_flock_locks;
2424 		struct ceph_pagelist_cursor trunc_point;
2425 
2426 		ceph_pagelist_set_cursor(pagelist, &trunc_point);
2427 		do {
2428 			lock_flocks();
2429 			ceph_count_locks(inode, &num_fcntl_locks,
2430 					 &num_flock_locks);
2431 			rec.v2.flock_len = (2*sizeof(u32) +
2432 					    (num_fcntl_locks+num_flock_locks) *
2433 					    sizeof(struct ceph_filelock));
2434 			unlock_flocks();
2435 
2436 			/* pre-alloc pagelist */
2437 			ceph_pagelist_truncate(pagelist, &trunc_point);
2438 			err = ceph_pagelist_append(pagelist, &rec, reclen);
2439 			if (!err)
2440 				err = ceph_pagelist_reserve(pagelist,
2441 							    rec.v2.flock_len);
2442 
2443 			/* encode locks */
2444 			if (!err) {
2445 				lock_flocks();
2446 				err = ceph_encode_locks(inode,
2447 							pagelist,
2448 							num_fcntl_locks,
2449 							num_flock_locks);
2450 				unlock_flocks();
2451 			}
2452 		} while (err == -ENOSPC);
2453 	} else {
2454 		err = ceph_pagelist_append(pagelist, &rec, reclen);
2455 	}
2456 
2457 out_free:
2458 	kfree(path);
2459 out_dput:
2460 	dput(dentry);
2461 	return err;
2462 }
2463 
2464 
2465 /*
2466  * If an MDS fails and recovers, clients need to reconnect in order to
2467  * reestablish shared state.  This includes all caps issued through
2468  * this session _and_ the snap_realm hierarchy.  Because it's not
2469  * clear which snap realms the mds cares about, we send everything we
2470  * know about.. that ensures we'll then get any new info the
2471  * recovering MDS might have.
2472  *
2473  * This is a relatively heavyweight operation, but it's rare.
2474  *
2475  * called with mdsc->mutex held.
2476  */
send_mds_reconnect(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)2477 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2478 			       struct ceph_mds_session *session)
2479 {
2480 	struct ceph_msg *reply;
2481 	struct rb_node *p;
2482 	int mds = session->s_mds;
2483 	int err = -ENOMEM;
2484 	struct ceph_pagelist *pagelist;
2485 	struct ceph_reconnect_state recon_state;
2486 
2487 	pr_info("mds%d reconnect start\n", mds);
2488 
2489 	pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2490 	if (!pagelist)
2491 		goto fail_nopagelist;
2492 	ceph_pagelist_init(pagelist);
2493 
2494 	reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS);
2495 	if (!reply)
2496 		goto fail_nomsg;
2497 
2498 	mutex_lock(&session->s_mutex);
2499 	session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2500 	session->s_seq = 0;
2501 
2502 	ceph_con_open(&session->s_con,
2503 		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2504 
2505 	/* replay unsafe requests */
2506 	replay_unsafe_requests(mdsc, session);
2507 
2508 	down_read(&mdsc->snap_rwsem);
2509 
2510 	dout("session %p state %s\n", session,
2511 	     session_state_name(session->s_state));
2512 
2513 	/* drop old cap expires; we're about to reestablish that state */
2514 	discard_cap_releases(mdsc, session);
2515 
2516 	/* traverse this session's caps */
2517 	err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
2518 	if (err)
2519 		goto fail;
2520 
2521 	recon_state.pagelist = pagelist;
2522 	recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2523 	err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2524 	if (err < 0)
2525 		goto fail;
2526 
2527 	/*
2528 	 * snaprealms.  we provide mds with the ino, seq (version), and
2529 	 * parent for all of our realms.  If the mds has any newer info,
2530 	 * it will tell us.
2531 	 */
2532 	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2533 		struct ceph_snap_realm *realm =
2534 			rb_entry(p, struct ceph_snap_realm, node);
2535 		struct ceph_mds_snaprealm_reconnect sr_rec;
2536 
2537 		dout(" adding snap realm %llx seq %lld parent %llx\n",
2538 		     realm->ino, realm->seq, realm->parent_ino);
2539 		sr_rec.ino = cpu_to_le64(realm->ino);
2540 		sr_rec.seq = cpu_to_le64(realm->seq);
2541 		sr_rec.parent = cpu_to_le64(realm->parent_ino);
2542 		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
2543 		if (err)
2544 			goto fail;
2545 	}
2546 
2547 	reply->pagelist = pagelist;
2548 	if (recon_state.flock)
2549 		reply->hdr.version = cpu_to_le16(2);
2550 	reply->hdr.data_len = cpu_to_le32(pagelist->length);
2551 	reply->nr_pages = calc_pages_for(0, pagelist->length);
2552 	ceph_con_send(&session->s_con, reply);
2553 
2554 	mutex_unlock(&session->s_mutex);
2555 
2556 	mutex_lock(&mdsc->mutex);
2557 	__wake_requests(mdsc, &session->s_waiting);
2558 	mutex_unlock(&mdsc->mutex);
2559 
2560 	up_read(&mdsc->snap_rwsem);
2561 	return;
2562 
2563 fail:
2564 	ceph_msg_put(reply);
2565 	up_read(&mdsc->snap_rwsem);
2566 	mutex_unlock(&session->s_mutex);
2567 fail_nomsg:
2568 	ceph_pagelist_release(pagelist);
2569 	kfree(pagelist);
2570 fail_nopagelist:
2571 	pr_err("error %d preparing reconnect for mds%d\n", err, mds);
2572 	return;
2573 }
2574 
2575 
2576 /*
2577  * compare old and new mdsmaps, kicking requests
2578  * and closing out old connections as necessary
2579  *
2580  * called under mdsc->mutex.
2581  */
check_new_map(struct ceph_mds_client * mdsc,struct ceph_mdsmap * newmap,struct ceph_mdsmap * oldmap)2582 static void check_new_map(struct ceph_mds_client *mdsc,
2583 			  struct ceph_mdsmap *newmap,
2584 			  struct ceph_mdsmap *oldmap)
2585 {
2586 	int i;
2587 	int oldstate, newstate;
2588 	struct ceph_mds_session *s;
2589 
2590 	dout("check_new_map new %u old %u\n",
2591 	     newmap->m_epoch, oldmap->m_epoch);
2592 
2593 	for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2594 		if (mdsc->sessions[i] == NULL)
2595 			continue;
2596 		s = mdsc->sessions[i];
2597 		oldstate = ceph_mdsmap_get_state(oldmap, i);
2598 		newstate = ceph_mdsmap_get_state(newmap, i);
2599 
2600 		dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
2601 		     i, ceph_mds_state_name(oldstate),
2602 		     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
2603 		     ceph_mds_state_name(newstate),
2604 		     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
2605 		     session_state_name(s->s_state));
2606 
2607 		if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
2608 			   ceph_mdsmap_get_addr(newmap, i),
2609 			   sizeof(struct ceph_entity_addr))) {
2610 			if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2611 				/* the session never opened, just close it
2612 				 * out now */
2613 				__wake_requests(mdsc, &s->s_waiting);
2614 				__unregister_session(mdsc, s);
2615 			} else {
2616 				/* just close it */
2617 				mutex_unlock(&mdsc->mutex);
2618 				mutex_lock(&s->s_mutex);
2619 				mutex_lock(&mdsc->mutex);
2620 				ceph_con_close(&s->s_con);
2621 				mutex_unlock(&s->s_mutex);
2622 				s->s_state = CEPH_MDS_SESSION_RESTARTING;
2623 			}
2624 
2625 			/* kick any requests waiting on the recovering mds */
2626 			kick_requests(mdsc, i);
2627 		} else if (oldstate == newstate) {
2628 			continue;  /* nothing new with this mds */
2629 		}
2630 
2631 		/*
2632 		 * send reconnect?
2633 		 */
2634 		if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2635 		    newstate >= CEPH_MDS_STATE_RECONNECT) {
2636 			mutex_unlock(&mdsc->mutex);
2637 			send_mds_reconnect(mdsc, s);
2638 			mutex_lock(&mdsc->mutex);
2639 		}
2640 
2641 		/*
2642 		 * kick request on any mds that has gone active.
2643 		 */
2644 		if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2645 		    newstate >= CEPH_MDS_STATE_ACTIVE) {
2646 			if (oldstate != CEPH_MDS_STATE_CREATING &&
2647 			    oldstate != CEPH_MDS_STATE_STARTING)
2648 				pr_info("mds%d recovery completed\n", s->s_mds);
2649 			kick_requests(mdsc, i);
2650 			ceph_kick_flushing_caps(mdsc, s);
2651 			wake_up_session_caps(s, 1);
2652 		}
2653 	}
2654 
2655 	for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
2656 		s = mdsc->sessions[i];
2657 		if (!s)
2658 			continue;
2659 		if (!ceph_mdsmap_is_laggy(newmap, i))
2660 			continue;
2661 		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2662 		    s->s_state == CEPH_MDS_SESSION_HUNG ||
2663 		    s->s_state == CEPH_MDS_SESSION_CLOSING) {
2664 			dout(" connecting to export targets of laggy mds%d\n",
2665 			     i);
2666 			__open_export_target_sessions(mdsc, s);
2667 		}
2668 	}
2669 }
2670 
2671 
2672 
2673 /*
2674  * leases
2675  */
2676 
2677 /*
2678  * caller must hold session s_mutex, dentry->d_lock
2679  */
__ceph_mdsc_drop_dentry_lease(struct dentry * dentry)2680 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2681 {
2682 	struct ceph_dentry_info *di = ceph_dentry(dentry);
2683 
2684 	ceph_put_mds_session(di->lease_session);
2685 	di->lease_session = NULL;
2686 }
2687 
handle_lease(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,struct ceph_msg * msg)2688 static void handle_lease(struct ceph_mds_client *mdsc,
2689 			 struct ceph_mds_session *session,
2690 			 struct ceph_msg *msg)
2691 {
2692 	struct super_block *sb = mdsc->fsc->sb;
2693 	struct inode *inode;
2694 	struct ceph_inode_info *ci;
2695 	struct dentry *parent, *dentry;
2696 	struct ceph_dentry_info *di;
2697 	int mds = session->s_mds;
2698 	struct ceph_mds_lease *h = msg->front.iov_base;
2699 	u32 seq;
2700 	struct ceph_vino vino;
2701 	int mask;
2702 	struct qstr dname;
2703 	int release = 0;
2704 
2705 	dout("handle_lease from mds%d\n", mds);
2706 
2707 	/* decode */
2708 	if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
2709 		goto bad;
2710 	vino.ino = le64_to_cpu(h->ino);
2711 	vino.snap = CEPH_NOSNAP;
2712 	mask = le16_to_cpu(h->mask);
2713 	seq = le32_to_cpu(h->seq);
2714 	dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2715 	dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
2716 	if (dname.len != get_unaligned_le32(h+1))
2717 		goto bad;
2718 
2719 	mutex_lock(&session->s_mutex);
2720 	session->s_seq++;
2721 
2722 	/* lookup inode */
2723 	inode = ceph_find_inode(sb, vino);
2724 	dout("handle_lease %s, mask %d, ino %llx %p %.*s\n",
2725 	     ceph_lease_op_name(h->action), mask, vino.ino, inode,
2726 	     dname.len, dname.name);
2727 	if (inode == NULL) {
2728 		dout("handle_lease no inode %llx\n", vino.ino);
2729 		goto release;
2730 	}
2731 	ci = ceph_inode(inode);
2732 
2733 	/* dentry */
2734 	parent = d_find_alias(inode);
2735 	if (!parent) {
2736 		dout("no parent dentry on inode %p\n", inode);
2737 		WARN_ON(1);
2738 		goto release;  /* hrm... */
2739 	}
2740 	dname.hash = full_name_hash(dname.name, dname.len);
2741 	dentry = d_lookup(parent, &dname);
2742 	dput(parent);
2743 	if (!dentry)
2744 		goto release;
2745 
2746 	spin_lock(&dentry->d_lock);
2747 	di = ceph_dentry(dentry);
2748 	switch (h->action) {
2749 	case CEPH_MDS_LEASE_REVOKE:
2750 		if (di && di->lease_session == session) {
2751 			if (ceph_seq_cmp(di->lease_seq, seq) > 0)
2752 				h->seq = cpu_to_le32(di->lease_seq);
2753 			__ceph_mdsc_drop_dentry_lease(dentry);
2754 		}
2755 		release = 1;
2756 		break;
2757 
2758 	case CEPH_MDS_LEASE_RENEW:
2759 		if (di && di->lease_session == session &&
2760 		    di->lease_gen == session->s_cap_gen &&
2761 		    di->lease_renew_from &&
2762 		    di->lease_renew_after == 0) {
2763 			unsigned long duration =
2764 				le32_to_cpu(h->duration_ms) * HZ / 1000;
2765 
2766 			di->lease_seq = seq;
2767 			dentry->d_time = di->lease_renew_from + duration;
2768 			di->lease_renew_after = di->lease_renew_from +
2769 				(duration >> 1);
2770 			di->lease_renew_from = 0;
2771 		}
2772 		break;
2773 	}
2774 	spin_unlock(&dentry->d_lock);
2775 	dput(dentry);
2776 
2777 	if (!release)
2778 		goto out;
2779 
2780 release:
2781 	/* let's just reuse the same message */
2782 	h->action = CEPH_MDS_LEASE_REVOKE_ACK;
2783 	ceph_msg_get(msg);
2784 	ceph_con_send(&session->s_con, msg);
2785 
2786 out:
2787 	iput(inode);
2788 	mutex_unlock(&session->s_mutex);
2789 	return;
2790 
2791 bad:
2792 	pr_err("corrupt lease message\n");
2793 	ceph_msg_dump(msg);
2794 }
2795 
ceph_mdsc_lease_send_msg(struct ceph_mds_session * session,struct inode * inode,struct dentry * dentry,char action,u32 seq)2796 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2797 			      struct inode *inode,
2798 			      struct dentry *dentry, char action,
2799 			      u32 seq)
2800 {
2801 	struct ceph_msg *msg;
2802 	struct ceph_mds_lease *lease;
2803 	int len = sizeof(*lease) + sizeof(u32);
2804 	int dnamelen = 0;
2805 
2806 	dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
2807 	     inode, dentry, ceph_lease_op_name(action), session->s_mds);
2808 	dnamelen = dentry->d_name.len;
2809 	len += dnamelen;
2810 
2811 	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS);
2812 	if (!msg)
2813 		return;
2814 	lease = msg->front.iov_base;
2815 	lease->action = action;
2816 	lease->mask = cpu_to_le16(1);
2817 	lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2818 	lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2819 	lease->seq = cpu_to_le32(seq);
2820 	put_unaligned_le32(dnamelen, lease + 1);
2821 	memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
2822 
2823 	/*
2824 	 * if this is a preemptive lease RELEASE, no need to
2825 	 * flush request stream, since the actual request will
2826 	 * soon follow.
2827 	 */
2828 	msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
2829 
2830 	ceph_con_send(&session->s_con, msg);
2831 }
2832 
2833 /*
2834  * Preemptively release a lease we expect to invalidate anyway.
2835  * Pass @inode always, @dentry is optional.
2836  */
ceph_mdsc_lease_release(struct ceph_mds_client * mdsc,struct inode * inode,struct dentry * dentry,int mask)2837 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2838 			     struct dentry *dentry, int mask)
2839 {
2840 	struct ceph_dentry_info *di;
2841 	struct ceph_mds_session *session;
2842 	u32 seq;
2843 
2844 	BUG_ON(inode == NULL);
2845 	BUG_ON(dentry == NULL);
2846 	BUG_ON(mask == 0);
2847 
2848 	/* is dentry lease valid? */
2849 	spin_lock(&dentry->d_lock);
2850 	di = ceph_dentry(dentry);
2851 	if (!di || !di->lease_session ||
2852 	    di->lease_session->s_mds < 0 ||
2853 	    di->lease_gen != di->lease_session->s_cap_gen ||
2854 	    !time_before(jiffies, dentry->d_time)) {
2855 		dout("lease_release inode %p dentry %p -- "
2856 		     "no lease on %d\n",
2857 		     inode, dentry, mask);
2858 		spin_unlock(&dentry->d_lock);
2859 		return;
2860 	}
2861 
2862 	/* we do have a lease on this dentry; note mds and seq */
2863 	session = ceph_get_mds_session(di->lease_session);
2864 	seq = di->lease_seq;
2865 	__ceph_mdsc_drop_dentry_lease(dentry);
2866 	spin_unlock(&dentry->d_lock);
2867 
2868 	dout("lease_release inode %p dentry %p mask %d to mds%d\n",
2869 	     inode, dentry, mask, session->s_mds);
2870 	ceph_mdsc_lease_send_msg(session, inode, dentry,
2871 				 CEPH_MDS_LEASE_RELEASE, seq);
2872 	ceph_put_mds_session(session);
2873 }
2874 
2875 /*
2876  * drop all leases (and dentry refs) in preparation for umount
2877  */
drop_leases(struct ceph_mds_client * mdsc)2878 static void drop_leases(struct ceph_mds_client *mdsc)
2879 {
2880 	int i;
2881 
2882 	dout("drop_leases\n");
2883 	mutex_lock(&mdsc->mutex);
2884 	for (i = 0; i < mdsc->max_sessions; i++) {
2885 		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2886 		if (!s)
2887 			continue;
2888 		mutex_unlock(&mdsc->mutex);
2889 		mutex_lock(&s->s_mutex);
2890 		mutex_unlock(&s->s_mutex);
2891 		ceph_put_mds_session(s);
2892 		mutex_lock(&mdsc->mutex);
2893 	}
2894 	mutex_unlock(&mdsc->mutex);
2895 }
2896 
2897 
2898 
2899 /*
2900  * delayed work -- periodically trim expired leases, renew caps with mds
2901  */
schedule_delayed(struct ceph_mds_client * mdsc)2902 static void schedule_delayed(struct ceph_mds_client *mdsc)
2903 {
2904 	int delay = 5;
2905 	unsigned hz = round_jiffies_relative(HZ * delay);
2906 	schedule_delayed_work(&mdsc->delayed_work, hz);
2907 }
2908 
delayed_work(struct work_struct * work)2909 static void delayed_work(struct work_struct *work)
2910 {
2911 	int i;
2912 	struct ceph_mds_client *mdsc =
2913 		container_of(work, struct ceph_mds_client, delayed_work.work);
2914 	int renew_interval;
2915 	int renew_caps;
2916 
2917 	dout("mdsc delayed_work\n");
2918 	ceph_check_delayed_caps(mdsc);
2919 
2920 	mutex_lock(&mdsc->mutex);
2921 	renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
2922 	renew_caps = time_after_eq(jiffies, HZ*renew_interval +
2923 				   mdsc->last_renew_caps);
2924 	if (renew_caps)
2925 		mdsc->last_renew_caps = jiffies;
2926 
2927 	for (i = 0; i < mdsc->max_sessions; i++) {
2928 		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2929 		if (s == NULL)
2930 			continue;
2931 		if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
2932 			dout("resending session close request for mds%d\n",
2933 			     s->s_mds);
2934 			request_close_session(mdsc, s);
2935 			ceph_put_mds_session(s);
2936 			continue;
2937 		}
2938 		if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
2939 			if (s->s_state == CEPH_MDS_SESSION_OPEN) {
2940 				s->s_state = CEPH_MDS_SESSION_HUNG;
2941 				pr_info("mds%d hung\n", s->s_mds);
2942 			}
2943 		}
2944 		if (s->s_state < CEPH_MDS_SESSION_OPEN) {
2945 			/* this mds is failed or recovering, just wait */
2946 			ceph_put_mds_session(s);
2947 			continue;
2948 		}
2949 		mutex_unlock(&mdsc->mutex);
2950 
2951 		mutex_lock(&s->s_mutex);
2952 		if (renew_caps)
2953 			send_renew_caps(mdsc, s);
2954 		else
2955 			ceph_con_keepalive(&s->s_con);
2956 		ceph_add_cap_releases(mdsc, s);
2957 		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2958 		    s->s_state == CEPH_MDS_SESSION_HUNG)
2959 			ceph_send_cap_releases(mdsc, s);
2960 		mutex_unlock(&s->s_mutex);
2961 		ceph_put_mds_session(s);
2962 
2963 		mutex_lock(&mdsc->mutex);
2964 	}
2965 	mutex_unlock(&mdsc->mutex);
2966 
2967 	schedule_delayed(mdsc);
2968 }
2969 
ceph_mdsc_init(struct ceph_fs_client * fsc)2970 int ceph_mdsc_init(struct ceph_fs_client *fsc)
2971 
2972 {
2973 	struct ceph_mds_client *mdsc;
2974 
2975 	mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
2976 	if (!mdsc)
2977 		return -ENOMEM;
2978 	mdsc->fsc = fsc;
2979 	fsc->mdsc = mdsc;
2980 	mutex_init(&mdsc->mutex);
2981 	mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
2982 	if (mdsc->mdsmap == NULL)
2983 		return -ENOMEM;
2984 
2985 	init_completion(&mdsc->safe_umount_waiters);
2986 	init_waitqueue_head(&mdsc->session_close_wq);
2987 	INIT_LIST_HEAD(&mdsc->waiting_for_map);
2988 	mdsc->sessions = NULL;
2989 	mdsc->max_sessions = 0;
2990 	mdsc->stopping = 0;
2991 	init_rwsem(&mdsc->snap_rwsem);
2992 	mdsc->snap_realms = RB_ROOT;
2993 	INIT_LIST_HEAD(&mdsc->snap_empty);
2994 	spin_lock_init(&mdsc->snap_empty_lock);
2995 	mdsc->last_tid = 0;
2996 	mdsc->request_tree = RB_ROOT;
2997 	INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
2998 	mdsc->last_renew_caps = jiffies;
2999 	INIT_LIST_HEAD(&mdsc->cap_delay_list);
3000 	spin_lock_init(&mdsc->cap_delay_lock);
3001 	INIT_LIST_HEAD(&mdsc->snap_flush_list);
3002 	spin_lock_init(&mdsc->snap_flush_lock);
3003 	mdsc->cap_flush_seq = 0;
3004 	INIT_LIST_HEAD(&mdsc->cap_dirty);
3005 	mdsc->num_cap_flushing = 0;
3006 	spin_lock_init(&mdsc->cap_dirty_lock);
3007 	init_waitqueue_head(&mdsc->cap_flushing_wq);
3008 	spin_lock_init(&mdsc->dentry_lru_lock);
3009 	INIT_LIST_HEAD(&mdsc->dentry_lru);
3010 
3011 	ceph_caps_init(mdsc);
3012 	ceph_adjust_min_caps(mdsc, fsc->min_caps);
3013 
3014 	return 0;
3015 }
3016 
3017 /*
3018  * Wait for safe replies on open mds requests.  If we time out, drop
3019  * all requests from the tree to avoid dangling dentry refs.
3020  */
wait_requests(struct ceph_mds_client * mdsc)3021 static void wait_requests(struct ceph_mds_client *mdsc)
3022 {
3023 	struct ceph_mds_request *req;
3024 	struct ceph_fs_client *fsc = mdsc->fsc;
3025 
3026 	mutex_lock(&mdsc->mutex);
3027 	if (__get_oldest_req(mdsc)) {
3028 		mutex_unlock(&mdsc->mutex);
3029 
3030 		dout("wait_requests waiting for requests\n");
3031 		wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3032 				    fsc->client->options->mount_timeout * HZ);
3033 
3034 		/* tear down remaining requests */
3035 		mutex_lock(&mdsc->mutex);
3036 		while ((req = __get_oldest_req(mdsc))) {
3037 			dout("wait_requests timed out on tid %llu\n",
3038 			     req->r_tid);
3039 			__unregister_request(mdsc, req);
3040 		}
3041 	}
3042 	mutex_unlock(&mdsc->mutex);
3043 	dout("wait_requests done\n");
3044 }
3045 
3046 /*
3047  * called before mount is ro, and before dentries are torn down.
3048  * (hmm, does this still race with new lookups?)
3049  */
ceph_mdsc_pre_umount(struct ceph_mds_client * mdsc)3050 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3051 {
3052 	dout("pre_umount\n");
3053 	mdsc->stopping = 1;
3054 
3055 	drop_leases(mdsc);
3056 	ceph_flush_dirty_caps(mdsc);
3057 	wait_requests(mdsc);
3058 
3059 	/*
3060 	 * wait for reply handlers to drop their request refs and
3061 	 * their inode/dcache refs
3062 	 */
3063 	ceph_msgr_flush();
3064 }
3065 
3066 /*
3067  * wait for all write mds requests to flush.
3068  */
wait_unsafe_requests(struct ceph_mds_client * mdsc,u64 want_tid)3069 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3070 {
3071 	struct ceph_mds_request *req = NULL, *nextreq;
3072 	struct rb_node *n;
3073 
3074 	mutex_lock(&mdsc->mutex);
3075 	dout("wait_unsafe_requests want %lld\n", want_tid);
3076 restart:
3077 	req = __get_oldest_req(mdsc);
3078 	while (req && req->r_tid <= want_tid) {
3079 		/* find next request */
3080 		n = rb_next(&req->r_node);
3081 		if (n)
3082 			nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3083 		else
3084 			nextreq = NULL;
3085 		if ((req->r_op & CEPH_MDS_OP_WRITE)) {
3086 			/* write op */
3087 			ceph_mdsc_get_request(req);
3088 			if (nextreq)
3089 				ceph_mdsc_get_request(nextreq);
3090 			mutex_unlock(&mdsc->mutex);
3091 			dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
3092 			     req->r_tid, want_tid);
3093 			wait_for_completion(&req->r_safe_completion);
3094 			mutex_lock(&mdsc->mutex);
3095 			ceph_mdsc_put_request(req);
3096 			if (!nextreq)
3097 				break;  /* next dne before, so we're done! */
3098 			if (RB_EMPTY_NODE(&nextreq->r_node)) {
3099 				/* next request was removed from tree */
3100 				ceph_mdsc_put_request(nextreq);
3101 				goto restart;
3102 			}
3103 			ceph_mdsc_put_request(nextreq);  /* won't go away */
3104 		}
3105 		req = nextreq;
3106 	}
3107 	mutex_unlock(&mdsc->mutex);
3108 	dout("wait_unsafe_requests done\n");
3109 }
3110 
ceph_mdsc_sync(struct ceph_mds_client * mdsc)3111 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3112 {
3113 	u64 want_tid, want_flush;
3114 
3115 	if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3116 		return;
3117 
3118 	dout("sync\n");
3119 	mutex_lock(&mdsc->mutex);
3120 	want_tid = mdsc->last_tid;
3121 	want_flush = mdsc->cap_flush_seq;
3122 	mutex_unlock(&mdsc->mutex);
3123 	dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3124 
3125 	ceph_flush_dirty_caps(mdsc);
3126 
3127 	wait_unsafe_requests(mdsc, want_tid);
3128 	wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
3129 }
3130 
3131 /*
3132  * true if all sessions are closed, or we force unmount
3133  */
done_closing_sessions(struct ceph_mds_client * mdsc)3134 bool done_closing_sessions(struct ceph_mds_client *mdsc)
3135 {
3136 	int i, n = 0;
3137 
3138 	if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3139 		return true;
3140 
3141 	mutex_lock(&mdsc->mutex);
3142 	for (i = 0; i < mdsc->max_sessions; i++)
3143 		if (mdsc->sessions[i])
3144 			n++;
3145 	mutex_unlock(&mdsc->mutex);
3146 	return n == 0;
3147 }
3148 
3149 /*
3150  * called after sb is ro.
3151  */
ceph_mdsc_close_sessions(struct ceph_mds_client * mdsc)3152 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3153 {
3154 	struct ceph_mds_session *session;
3155 	int i;
3156 	struct ceph_fs_client *fsc = mdsc->fsc;
3157 	unsigned long timeout = fsc->client->options->mount_timeout * HZ;
3158 
3159 	dout("close_sessions\n");
3160 
3161 	/* close sessions */
3162 	mutex_lock(&mdsc->mutex);
3163 	for (i = 0; i < mdsc->max_sessions; i++) {
3164 		session = __ceph_lookup_mds_session(mdsc, i);
3165 		if (!session)
3166 			continue;
3167 		mutex_unlock(&mdsc->mutex);
3168 		mutex_lock(&session->s_mutex);
3169 		__close_session(mdsc, session);
3170 		mutex_unlock(&session->s_mutex);
3171 		ceph_put_mds_session(session);
3172 		mutex_lock(&mdsc->mutex);
3173 	}
3174 	mutex_unlock(&mdsc->mutex);
3175 
3176 	dout("waiting for sessions to close\n");
3177 	wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
3178 			   timeout);
3179 
3180 	/* tear down remaining sessions */
3181 	mutex_lock(&mdsc->mutex);
3182 	for (i = 0; i < mdsc->max_sessions; i++) {
3183 		if (mdsc->sessions[i]) {
3184 			session = get_session(mdsc->sessions[i]);
3185 			__unregister_session(mdsc, session);
3186 			mutex_unlock(&mdsc->mutex);
3187 			mutex_lock(&session->s_mutex);
3188 			remove_session_caps(session);
3189 			mutex_unlock(&session->s_mutex);
3190 			ceph_put_mds_session(session);
3191 			mutex_lock(&mdsc->mutex);
3192 		}
3193 	}
3194 	WARN_ON(!list_empty(&mdsc->cap_delay_list));
3195 	mutex_unlock(&mdsc->mutex);
3196 
3197 	ceph_cleanup_empty_realms(mdsc);
3198 
3199 	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3200 
3201 	dout("stopped\n");
3202 }
3203 
ceph_mdsc_stop(struct ceph_mds_client * mdsc)3204 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3205 {
3206 	dout("stop\n");
3207 	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3208 	if (mdsc->mdsmap)
3209 		ceph_mdsmap_destroy(mdsc->mdsmap);
3210 	kfree(mdsc->sessions);
3211 	ceph_caps_finalize(mdsc);
3212 }
3213 
ceph_mdsc_destroy(struct ceph_fs_client * fsc)3214 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3215 {
3216 	struct ceph_mds_client *mdsc = fsc->mdsc;
3217 
3218 	dout("mdsc_destroy %p\n", mdsc);
3219 	ceph_mdsc_stop(mdsc);
3220 
3221 	/* flush out any connection work with references to us */
3222 	ceph_msgr_flush();
3223 
3224 	fsc->mdsc = NULL;
3225 	kfree(mdsc);
3226 	dout("mdsc_destroy %p done\n", mdsc);
3227 }
3228 
3229 
3230 /*
3231  * handle mds map update.
3232  */
ceph_mdsc_handle_map(struct ceph_mds_client * mdsc,struct ceph_msg * msg)3233 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3234 {
3235 	u32 epoch;
3236 	u32 maplen;
3237 	void *p = msg->front.iov_base;
3238 	void *end = p + msg->front.iov_len;
3239 	struct ceph_mdsmap *newmap, *oldmap;
3240 	struct ceph_fsid fsid;
3241 	int err = -EINVAL;
3242 
3243 	ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3244 	ceph_decode_copy(&p, &fsid, sizeof(fsid));
3245 	if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3246 		return;
3247 	epoch = ceph_decode_32(&p);
3248 	maplen = ceph_decode_32(&p);
3249 	dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3250 
3251 	/* do we need it? */
3252 	ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
3253 	mutex_lock(&mdsc->mutex);
3254 	if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3255 		dout("handle_map epoch %u <= our %u\n",
3256 		     epoch, mdsc->mdsmap->m_epoch);
3257 		mutex_unlock(&mdsc->mutex);
3258 		return;
3259 	}
3260 
3261 	newmap = ceph_mdsmap_decode(&p, end);
3262 	if (IS_ERR(newmap)) {
3263 		err = PTR_ERR(newmap);
3264 		goto bad_unlock;
3265 	}
3266 
3267 	/* swap into place */
3268 	if (mdsc->mdsmap) {
3269 		oldmap = mdsc->mdsmap;
3270 		mdsc->mdsmap = newmap;
3271 		check_new_map(mdsc, newmap, oldmap);
3272 		ceph_mdsmap_destroy(oldmap);
3273 	} else {
3274 		mdsc->mdsmap = newmap;  /* first mds map */
3275 	}
3276 	mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3277 
3278 	__wake_requests(mdsc, &mdsc->waiting_for_map);
3279 
3280 	mutex_unlock(&mdsc->mutex);
3281 	schedule_delayed(mdsc);
3282 	return;
3283 
3284 bad_unlock:
3285 	mutex_unlock(&mdsc->mutex);
3286 bad:
3287 	pr_err("error decoding mdsmap %d\n", err);
3288 	return;
3289 }
3290 
con_get(struct ceph_connection * con)3291 static struct ceph_connection *con_get(struct ceph_connection *con)
3292 {
3293 	struct ceph_mds_session *s = con->private;
3294 
3295 	if (get_session(s)) {
3296 		dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3297 		return con;
3298 	}
3299 	dout("mdsc con_get %p FAIL\n", s);
3300 	return NULL;
3301 }
3302 
con_put(struct ceph_connection * con)3303 static void con_put(struct ceph_connection *con)
3304 {
3305 	struct ceph_mds_session *s = con->private;
3306 
3307 	dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3308 	ceph_put_mds_session(s);
3309 }
3310 
3311 /*
3312  * if the client is unresponsive for long enough, the mds will kill
3313  * the session entirely.
3314  */
peer_reset(struct ceph_connection * con)3315 static void peer_reset(struct ceph_connection *con)
3316 {
3317 	struct ceph_mds_session *s = con->private;
3318 	struct ceph_mds_client *mdsc = s->s_mdsc;
3319 
3320 	pr_warning("mds%d closed our session\n", s->s_mds);
3321 	send_mds_reconnect(mdsc, s);
3322 }
3323 
dispatch(struct ceph_connection * con,struct ceph_msg * msg)3324 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3325 {
3326 	struct ceph_mds_session *s = con->private;
3327 	struct ceph_mds_client *mdsc = s->s_mdsc;
3328 	int type = le16_to_cpu(msg->hdr.type);
3329 
3330 	mutex_lock(&mdsc->mutex);
3331 	if (__verify_registered_session(mdsc, s) < 0) {
3332 		mutex_unlock(&mdsc->mutex);
3333 		goto out;
3334 	}
3335 	mutex_unlock(&mdsc->mutex);
3336 
3337 	switch (type) {
3338 	case CEPH_MSG_MDS_MAP:
3339 		ceph_mdsc_handle_map(mdsc, msg);
3340 		break;
3341 	case CEPH_MSG_CLIENT_SESSION:
3342 		handle_session(s, msg);
3343 		break;
3344 	case CEPH_MSG_CLIENT_REPLY:
3345 		handle_reply(s, msg);
3346 		break;
3347 	case CEPH_MSG_CLIENT_REQUEST_FORWARD:
3348 		handle_forward(mdsc, s, msg);
3349 		break;
3350 	case CEPH_MSG_CLIENT_CAPS:
3351 		ceph_handle_caps(s, msg);
3352 		break;
3353 	case CEPH_MSG_CLIENT_SNAP:
3354 		ceph_handle_snap(mdsc, s, msg);
3355 		break;
3356 	case CEPH_MSG_CLIENT_LEASE:
3357 		handle_lease(mdsc, s, msg);
3358 		break;
3359 
3360 	default:
3361 		pr_err("received unknown message type %d %s\n", type,
3362 		       ceph_msg_type_name(type));
3363 	}
3364 out:
3365 	ceph_msg_put(msg);
3366 }
3367 
3368 /*
3369  * authentication
3370  */
get_authorizer(struct ceph_connection * con,void ** buf,int * len,int * proto,void ** reply_buf,int * reply_len,int force_new)3371 static int get_authorizer(struct ceph_connection *con,
3372 			  void **buf, int *len, int *proto,
3373 			  void **reply_buf, int *reply_len, int force_new)
3374 {
3375 	struct ceph_mds_session *s = con->private;
3376 	struct ceph_mds_client *mdsc = s->s_mdsc;
3377 	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3378 	int ret = 0;
3379 
3380 	if (force_new && s->s_authorizer) {
3381 		ac->ops->destroy_authorizer(ac, s->s_authorizer);
3382 		s->s_authorizer = NULL;
3383 	}
3384 	if (s->s_authorizer == NULL) {
3385 		if (ac->ops->create_authorizer) {
3386 			ret = ac->ops->create_authorizer(
3387 				ac, CEPH_ENTITY_TYPE_MDS,
3388 				&s->s_authorizer,
3389 				&s->s_authorizer_buf,
3390 				&s->s_authorizer_buf_len,
3391 				&s->s_authorizer_reply_buf,
3392 				&s->s_authorizer_reply_buf_len);
3393 			if (ret)
3394 				return ret;
3395 		}
3396 	}
3397 
3398 	*proto = ac->protocol;
3399 	*buf = s->s_authorizer_buf;
3400 	*len = s->s_authorizer_buf_len;
3401 	*reply_buf = s->s_authorizer_reply_buf;
3402 	*reply_len = s->s_authorizer_reply_buf_len;
3403 	return 0;
3404 }
3405 
3406 
verify_authorizer_reply(struct ceph_connection * con,int len)3407 static int verify_authorizer_reply(struct ceph_connection *con, int len)
3408 {
3409 	struct ceph_mds_session *s = con->private;
3410 	struct ceph_mds_client *mdsc = s->s_mdsc;
3411 	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3412 
3413 	return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len);
3414 }
3415 
invalidate_authorizer(struct ceph_connection * con)3416 static int invalidate_authorizer(struct ceph_connection *con)
3417 {
3418 	struct ceph_mds_session *s = con->private;
3419 	struct ceph_mds_client *mdsc = s->s_mdsc;
3420 	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3421 
3422 	if (ac->ops->invalidate_authorizer)
3423 		ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3424 
3425 	return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3426 }
3427 
3428 static const struct ceph_connection_operations mds_con_ops = {
3429 	.get = con_get,
3430 	.put = con_put,
3431 	.dispatch = dispatch,
3432 	.get_authorizer = get_authorizer,
3433 	.verify_authorizer_reply = verify_authorizer_reply,
3434 	.invalidate_authorizer = invalidate_authorizer,
3435 	.peer_reset = peer_reset,
3436 };
3437 
3438 /* eof */
3439