1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 static struct task_struct *	scand_task;
33 
34 
dlm_control_store(struct dlm_ls * ls,const char * buf,size_t len)35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37 	ssize_t ret = len;
38 	int n = simple_strtol(buf, NULL, 0);
39 
40 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
41 	if (!ls)
42 		return -EINVAL;
43 
44 	switch (n) {
45 	case 0:
46 		dlm_ls_stop(ls);
47 		break;
48 	case 1:
49 		dlm_ls_start(ls);
50 		break;
51 	default:
52 		ret = -EINVAL;
53 	}
54 	dlm_put_lockspace(ls);
55 	return ret;
56 }
57 
dlm_event_store(struct dlm_ls * ls,const char * buf,size_t len)58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60 	ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62 	wake_up(&ls->ls_uevent_wait);
63 	return len;
64 }
65 
dlm_id_show(struct dlm_ls * ls,char * buf)66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70 
dlm_id_store(struct dlm_ls * ls,const char * buf,size_t len)71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73 	ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74 	return len;
75 }
76 
dlm_recover_status_show(struct dlm_ls * ls,char * buf)77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78 {
79 	uint32_t status = dlm_recover_status(ls);
80 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
81 }
82 
dlm_recover_nodeid_show(struct dlm_ls * ls,char * buf)83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
84 {
85 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
86 }
87 
88 struct dlm_attr {
89 	struct attribute attr;
90 	ssize_t (*show)(struct dlm_ls *, char *);
91 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
92 };
93 
94 static struct dlm_attr dlm_attr_control = {
95 	.attr  = {.name = "control", .mode = S_IWUSR},
96 	.store = dlm_control_store
97 };
98 
99 static struct dlm_attr dlm_attr_event = {
100 	.attr  = {.name = "event_done", .mode = S_IWUSR},
101 	.store = dlm_event_store
102 };
103 
104 static struct dlm_attr dlm_attr_id = {
105 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106 	.show  = dlm_id_show,
107 	.store = dlm_id_store
108 };
109 
110 static struct dlm_attr dlm_attr_recover_status = {
111 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
112 	.show  = dlm_recover_status_show
113 };
114 
115 static struct dlm_attr dlm_attr_recover_nodeid = {
116 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
117 	.show  = dlm_recover_nodeid_show
118 };
119 
120 static struct attribute *dlm_attrs[] = {
121 	&dlm_attr_control.attr,
122 	&dlm_attr_event.attr,
123 	&dlm_attr_id.attr,
124 	&dlm_attr_recover_status.attr,
125 	&dlm_attr_recover_nodeid.attr,
126 	NULL,
127 };
128 
dlm_attr_show(struct kobject * kobj,struct attribute * attr,char * buf)129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
130 			     char *buf)
131 {
132 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
133 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134 	return a->show ? a->show(ls, buf) : 0;
135 }
136 
dlm_attr_store(struct kobject * kobj,struct attribute * attr,const char * buf,size_t len)137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138 			      const char *buf, size_t len)
139 {
140 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
141 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142 	return a->store ? a->store(ls, buf, len) : len;
143 }
144 
lockspace_kobj_release(struct kobject * k)145 static void lockspace_kobj_release(struct kobject *k)
146 {
147 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
148 	kfree(ls);
149 }
150 
151 static const struct sysfs_ops dlm_attr_ops = {
152 	.show  = dlm_attr_show,
153 	.store = dlm_attr_store,
154 };
155 
156 static struct kobj_type dlm_ktype = {
157 	.default_attrs = dlm_attrs,
158 	.sysfs_ops     = &dlm_attr_ops,
159 	.release       = lockspace_kobj_release,
160 };
161 
162 static struct kset *dlm_kset;
163 
do_uevent(struct dlm_ls * ls,int in)164 static int do_uevent(struct dlm_ls *ls, int in)
165 {
166 	int error;
167 
168 	if (in)
169 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170 	else
171 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
172 
173 	log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
174 
175 	/* dlm_controld will see the uevent, do the necessary group management
176 	   and then write to sysfs to wake us */
177 
178 	error = wait_event_interruptible(ls->ls_uevent_wait,
179 			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
180 
181 	log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
182 
183 	if (error)
184 		goto out;
185 
186 	error = ls->ls_uevent_result;
187  out:
188 	if (error)
189 		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190 			  error, ls->ls_uevent_result);
191 	return error;
192 }
193 
dlm_uevent(struct kset * kset,struct kobject * kobj,struct kobj_uevent_env * env)194 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
195 		      struct kobj_uevent_env *env)
196 {
197 	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
198 
199 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
200 	return 0;
201 }
202 
203 static struct kset_uevent_ops dlm_uevent_ops = {
204 	.uevent = dlm_uevent,
205 };
206 
dlm_lockspace_init(void)207 int __init dlm_lockspace_init(void)
208 {
209 	ls_count = 0;
210 	mutex_init(&ls_lock);
211 	INIT_LIST_HEAD(&lslist);
212 	spin_lock_init(&lslist_lock);
213 
214 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
215 	if (!dlm_kset) {
216 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
217 		return -ENOMEM;
218 	}
219 	return 0;
220 }
221 
dlm_lockspace_exit(void)222 void dlm_lockspace_exit(void)
223 {
224 	kset_unregister(dlm_kset);
225 }
226 
find_ls_to_scan(void)227 static struct dlm_ls *find_ls_to_scan(void)
228 {
229 	struct dlm_ls *ls;
230 
231 	spin_lock(&lslist_lock);
232 	list_for_each_entry(ls, &lslist, ls_list) {
233 		if (time_after_eq(jiffies, ls->ls_scan_time +
234 					    dlm_config.ci_scan_secs * HZ)) {
235 			spin_unlock(&lslist_lock);
236 			return ls;
237 		}
238 	}
239 	spin_unlock(&lslist_lock);
240 	return NULL;
241 }
242 
dlm_scand(void * data)243 static int dlm_scand(void *data)
244 {
245 	struct dlm_ls *ls;
246 
247 	while (!kthread_should_stop()) {
248 		ls = find_ls_to_scan();
249 		if (ls) {
250 			if (dlm_lock_recovery_try(ls)) {
251 				ls->ls_scan_time = jiffies;
252 				dlm_scan_rsbs(ls);
253 				dlm_scan_timeout(ls);
254 				dlm_scan_waiters(ls);
255 				dlm_unlock_recovery(ls);
256 			} else {
257 				ls->ls_scan_time += HZ;
258 			}
259 			continue;
260 		}
261 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
262 	}
263 	return 0;
264 }
265 
dlm_scand_start(void)266 static int dlm_scand_start(void)
267 {
268 	struct task_struct *p;
269 	int error = 0;
270 
271 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
272 	if (IS_ERR(p))
273 		error = PTR_ERR(p);
274 	else
275 		scand_task = p;
276 	return error;
277 }
278 
dlm_scand_stop(void)279 static void dlm_scand_stop(void)
280 {
281 	kthread_stop(scand_task);
282 }
283 
dlm_find_lockspace_global(uint32_t id)284 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
285 {
286 	struct dlm_ls *ls;
287 
288 	spin_lock(&lslist_lock);
289 
290 	list_for_each_entry(ls, &lslist, ls_list) {
291 		if (ls->ls_global_id == id) {
292 			ls->ls_count++;
293 			goto out;
294 		}
295 	}
296 	ls = NULL;
297  out:
298 	spin_unlock(&lslist_lock);
299 	return ls;
300 }
301 
dlm_find_lockspace_local(dlm_lockspace_t * lockspace)302 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
303 {
304 	struct dlm_ls *ls;
305 
306 	spin_lock(&lslist_lock);
307 	list_for_each_entry(ls, &lslist, ls_list) {
308 		if (ls->ls_local_handle == lockspace) {
309 			ls->ls_count++;
310 			goto out;
311 		}
312 	}
313 	ls = NULL;
314  out:
315 	spin_unlock(&lslist_lock);
316 	return ls;
317 }
318 
dlm_find_lockspace_device(int minor)319 struct dlm_ls *dlm_find_lockspace_device(int minor)
320 {
321 	struct dlm_ls *ls;
322 
323 	spin_lock(&lslist_lock);
324 	list_for_each_entry(ls, &lslist, ls_list) {
325 		if (ls->ls_device.minor == minor) {
326 			ls->ls_count++;
327 			goto out;
328 		}
329 	}
330 	ls = NULL;
331  out:
332 	spin_unlock(&lslist_lock);
333 	return ls;
334 }
335 
dlm_put_lockspace(struct dlm_ls * ls)336 void dlm_put_lockspace(struct dlm_ls *ls)
337 {
338 	spin_lock(&lslist_lock);
339 	ls->ls_count--;
340 	spin_unlock(&lslist_lock);
341 }
342 
remove_lockspace(struct dlm_ls * ls)343 static void remove_lockspace(struct dlm_ls *ls)
344 {
345 	for (;;) {
346 		spin_lock(&lslist_lock);
347 		if (ls->ls_count == 0) {
348 			WARN_ON(ls->ls_create_count != 0);
349 			list_del(&ls->ls_list);
350 			spin_unlock(&lslist_lock);
351 			return;
352 		}
353 		spin_unlock(&lslist_lock);
354 		ssleep(1);
355 	}
356 }
357 
threads_start(void)358 static int threads_start(void)
359 {
360 	int error;
361 
362 	error = dlm_scand_start();
363 	if (error) {
364 		log_print("cannot start dlm_scand thread %d", error);
365 		goto fail;
366 	}
367 
368 	/* Thread for sending/receiving messages for all lockspace's */
369 	error = dlm_lowcomms_start();
370 	if (error) {
371 		log_print("cannot start dlm lowcomms %d", error);
372 		goto scand_fail;
373 	}
374 
375 	return 0;
376 
377  scand_fail:
378 	dlm_scand_stop();
379  fail:
380 	return error;
381 }
382 
threads_stop(void)383 static void threads_stop(void)
384 {
385 	dlm_scand_stop();
386 	dlm_lowcomms_stop();
387 }
388 
new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)389 static int new_lockspace(const char *name, const char *cluster,
390 			 uint32_t flags, int lvblen,
391 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
392 			 int *ops_result, dlm_lockspace_t **lockspace)
393 {
394 	struct dlm_ls *ls;
395 	int i, size, error;
396 	int do_unreg = 0;
397 	int namelen = strlen(name);
398 
399 	if (namelen > DLM_LOCKSPACE_LEN)
400 		return -EINVAL;
401 
402 	if (!lvblen || (lvblen % 8))
403 		return -EINVAL;
404 
405 	if (!try_module_get(THIS_MODULE))
406 		return -EINVAL;
407 
408 	if (!dlm_user_daemon_available()) {
409 		log_print("dlm user daemon not available");
410 		error = -EUNATCH;
411 		goto out;
412 	}
413 
414 	if (ops && ops_result) {
415 	       	if (!dlm_config.ci_recover_callbacks)
416 			*ops_result = -EOPNOTSUPP;
417 		else
418 			*ops_result = 0;
419 	}
420 
421 	if (dlm_config.ci_recover_callbacks && cluster &&
422 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
423 		log_print("dlm cluster name %s mismatch %s",
424 			  dlm_config.ci_cluster_name, cluster);
425 		error = -EBADR;
426 		goto out;
427 	}
428 
429 	error = 0;
430 
431 	spin_lock(&lslist_lock);
432 	list_for_each_entry(ls, &lslist, ls_list) {
433 		WARN_ON(ls->ls_create_count <= 0);
434 		if (ls->ls_namelen != namelen)
435 			continue;
436 		if (memcmp(ls->ls_name, name, namelen))
437 			continue;
438 		if (flags & DLM_LSFL_NEWEXCL) {
439 			error = -EEXIST;
440 			break;
441 		}
442 		ls->ls_create_count++;
443 		*lockspace = ls;
444 		error = 1;
445 		break;
446 	}
447 	spin_unlock(&lslist_lock);
448 
449 	if (error)
450 		goto out;
451 
452 	error = -ENOMEM;
453 
454 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
455 	if (!ls)
456 		goto out;
457 	memcpy(ls->ls_name, name, namelen);
458 	ls->ls_namelen = namelen;
459 	ls->ls_lvblen = lvblen;
460 	ls->ls_count = 0;
461 	ls->ls_flags = 0;
462 	ls->ls_scan_time = jiffies;
463 
464 	if (ops && dlm_config.ci_recover_callbacks) {
465 		ls->ls_ops = ops;
466 		ls->ls_ops_arg = ops_arg;
467 	}
468 
469 	if (flags & DLM_LSFL_TIMEWARN)
470 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
471 
472 	/* ls_exflags are forced to match among nodes, and we don't
473 	   need to require all nodes to have some flags set */
474 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
475 				    DLM_LSFL_NEWEXCL));
476 
477 	size = dlm_config.ci_rsbtbl_size;
478 	ls->ls_rsbtbl_size = size;
479 
480 	ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
481 	if (!ls->ls_rsbtbl)
482 		goto out_lsfree;
483 	for (i = 0; i < size; i++) {
484 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
485 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
486 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
487 	}
488 
489 	idr_init(&ls->ls_lkbidr);
490 	spin_lock_init(&ls->ls_lkbidr_spin);
491 
492 	size = dlm_config.ci_dirtbl_size;
493 	ls->ls_dirtbl_size = size;
494 
495 	ls->ls_dirtbl = vmalloc(sizeof(struct dlm_dirtable) * size);
496 	if (!ls->ls_dirtbl)
497 		goto out_lkbfree;
498 	for (i = 0; i < size; i++) {
499 		INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
500 		spin_lock_init(&ls->ls_dirtbl[i].lock);
501 	}
502 
503 	INIT_LIST_HEAD(&ls->ls_waiters);
504 	mutex_init(&ls->ls_waiters_mutex);
505 	INIT_LIST_HEAD(&ls->ls_orphans);
506 	mutex_init(&ls->ls_orphans_mutex);
507 	INIT_LIST_HEAD(&ls->ls_timeout);
508 	mutex_init(&ls->ls_timeout_mutex);
509 
510 	INIT_LIST_HEAD(&ls->ls_new_rsb);
511 	spin_lock_init(&ls->ls_new_rsb_spin);
512 
513 	INIT_LIST_HEAD(&ls->ls_nodes);
514 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
515 	ls->ls_num_nodes = 0;
516 	ls->ls_low_nodeid = 0;
517 	ls->ls_total_weight = 0;
518 	ls->ls_node_array = NULL;
519 
520 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
521 	ls->ls_stub_rsb.res_ls = ls;
522 
523 	ls->ls_debug_rsb_dentry = NULL;
524 	ls->ls_debug_waiters_dentry = NULL;
525 
526 	init_waitqueue_head(&ls->ls_uevent_wait);
527 	ls->ls_uevent_result = 0;
528 	init_completion(&ls->ls_members_done);
529 	ls->ls_members_result = -1;
530 
531 	mutex_init(&ls->ls_cb_mutex);
532 	INIT_LIST_HEAD(&ls->ls_cb_delay);
533 
534 	ls->ls_recoverd_task = NULL;
535 	mutex_init(&ls->ls_recoverd_active);
536 	spin_lock_init(&ls->ls_recover_lock);
537 	spin_lock_init(&ls->ls_rcom_spin);
538 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
539 	ls->ls_recover_status = 0;
540 	ls->ls_recover_seq = 0;
541 	ls->ls_recover_args = NULL;
542 	init_rwsem(&ls->ls_in_recovery);
543 	init_rwsem(&ls->ls_recv_active);
544 	INIT_LIST_HEAD(&ls->ls_requestqueue);
545 	mutex_init(&ls->ls_requestqueue_mutex);
546 	mutex_init(&ls->ls_clear_proc_locks);
547 
548 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
549 	if (!ls->ls_recover_buf)
550 		goto out_dirfree;
551 
552 	ls->ls_slot = 0;
553 	ls->ls_num_slots = 0;
554 	ls->ls_slots_size = 0;
555 	ls->ls_slots = NULL;
556 
557 	INIT_LIST_HEAD(&ls->ls_recover_list);
558 	spin_lock_init(&ls->ls_recover_list_lock);
559 	ls->ls_recover_list_count = 0;
560 	ls->ls_local_handle = ls;
561 	init_waitqueue_head(&ls->ls_wait_general);
562 	INIT_LIST_HEAD(&ls->ls_root_list);
563 	init_rwsem(&ls->ls_root_sem);
564 
565 	down_write(&ls->ls_in_recovery);
566 
567 	spin_lock(&lslist_lock);
568 	ls->ls_create_count = 1;
569 	list_add(&ls->ls_list, &lslist);
570 	spin_unlock(&lslist_lock);
571 
572 	if (flags & DLM_LSFL_FS) {
573 		error = dlm_callback_start(ls);
574 		if (error) {
575 			log_error(ls, "can't start dlm_callback %d", error);
576 			goto out_delist;
577 		}
578 	}
579 
580 	/* needs to find ls in lslist */
581 	error = dlm_recoverd_start(ls);
582 	if (error) {
583 		log_error(ls, "can't start dlm_recoverd %d", error);
584 		goto out_callback;
585 	}
586 
587 	ls->ls_kobj.kset = dlm_kset;
588 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
589 				     "%s", ls->ls_name);
590 	if (error)
591 		goto out_recoverd;
592 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
593 
594 	/* let kobject handle freeing of ls if there's an error */
595 	do_unreg = 1;
596 
597 	/* This uevent triggers dlm_controld in userspace to add us to the
598 	   group of nodes that are members of this lockspace (managed by the
599 	   cluster infrastructure.)  Once it's done that, it tells us who the
600 	   current lockspace members are (via configfs) and then tells the
601 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
602 
603 	error = do_uevent(ls, 1);
604 	if (error)
605 		goto out_recoverd;
606 
607 	wait_for_completion(&ls->ls_members_done);
608 	error = ls->ls_members_result;
609 	if (error)
610 		goto out_members;
611 
612 	dlm_create_debug_file(ls);
613 
614 	log_debug(ls, "join complete");
615 	*lockspace = ls;
616 	return 0;
617 
618  out_members:
619 	do_uevent(ls, 0);
620 	dlm_clear_members(ls);
621 	kfree(ls->ls_node_array);
622  out_recoverd:
623 	dlm_recoverd_stop(ls);
624  out_callback:
625 	dlm_callback_stop(ls);
626  out_delist:
627 	spin_lock(&lslist_lock);
628 	list_del(&ls->ls_list);
629 	spin_unlock(&lslist_lock);
630 	kfree(ls->ls_recover_buf);
631  out_dirfree:
632 	vfree(ls->ls_dirtbl);
633  out_lkbfree:
634 	idr_destroy(&ls->ls_lkbidr);
635 	vfree(ls->ls_rsbtbl);
636  out_lsfree:
637 	if (do_unreg)
638 		kobject_put(&ls->ls_kobj);
639 	else
640 		kfree(ls);
641  out:
642 	module_put(THIS_MODULE);
643 	return error;
644 }
645 
dlm_new_lockspace(const char * name,const char * cluster,uint32_t flags,int lvblen,const struct dlm_lockspace_ops * ops,void * ops_arg,int * ops_result,dlm_lockspace_t ** lockspace)646 int dlm_new_lockspace(const char *name, const char *cluster,
647 		      uint32_t flags, int lvblen,
648 		      const struct dlm_lockspace_ops *ops, void *ops_arg,
649 		      int *ops_result, dlm_lockspace_t **lockspace)
650 {
651 	int error = 0;
652 
653 	mutex_lock(&ls_lock);
654 	if (!ls_count)
655 		error = threads_start();
656 	if (error)
657 		goto out;
658 
659 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
660 			      ops_result, lockspace);
661 	if (!error)
662 		ls_count++;
663 	if (error > 0)
664 		error = 0;
665 	if (!ls_count)
666 		threads_stop();
667  out:
668 	mutex_unlock(&ls_lock);
669 	return error;
670 }
671 
lkb_idr_is_local(int id,void * p,void * data)672 static int lkb_idr_is_local(int id, void *p, void *data)
673 {
674 	struct dlm_lkb *lkb = p;
675 
676 	if (!lkb->lkb_nodeid)
677 		return 1;
678 	return 0;
679 }
680 
lkb_idr_is_any(int id,void * p,void * data)681 static int lkb_idr_is_any(int id, void *p, void *data)
682 {
683 	return 1;
684 }
685 
lkb_idr_free(int id,void * p,void * data)686 static int lkb_idr_free(int id, void *p, void *data)
687 {
688 	struct dlm_lkb *lkb = p;
689 
690 	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
691 		dlm_free_lvb(lkb->lkb_lvbptr);
692 
693 	dlm_free_lkb(lkb);
694 	return 0;
695 }
696 
697 /* NOTE: We check the lkbidr here rather than the resource table.
698    This is because there may be LKBs queued as ASTs that have been unlinked
699    from their RSBs and are pending deletion once the AST has been delivered */
700 
lockspace_busy(struct dlm_ls * ls,int force)701 static int lockspace_busy(struct dlm_ls *ls, int force)
702 {
703 	int rv;
704 
705 	spin_lock(&ls->ls_lkbidr_spin);
706 	if (force == 0) {
707 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
708 	} else if (force == 1) {
709 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
710 	} else {
711 		rv = 0;
712 	}
713 	spin_unlock(&ls->ls_lkbidr_spin);
714 	return rv;
715 }
716 
release_lockspace(struct dlm_ls * ls,int force)717 static int release_lockspace(struct dlm_ls *ls, int force)
718 {
719 	struct dlm_rsb *rsb;
720 	struct rb_node *n;
721 	int i, busy, rv;
722 
723 	busy = lockspace_busy(ls, force);
724 
725 	spin_lock(&lslist_lock);
726 	if (ls->ls_create_count == 1) {
727 		if (busy) {
728 			rv = -EBUSY;
729 		} else {
730 			/* remove_lockspace takes ls off lslist */
731 			ls->ls_create_count = 0;
732 			rv = 0;
733 		}
734 	} else if (ls->ls_create_count > 1) {
735 		rv = --ls->ls_create_count;
736 	} else {
737 		rv = -EINVAL;
738 	}
739 	spin_unlock(&lslist_lock);
740 
741 	if (rv) {
742 		log_debug(ls, "release_lockspace no remove %d", rv);
743 		return rv;
744 	}
745 
746 	dlm_device_deregister(ls);
747 
748 	if (force < 3 && dlm_user_daemon_available())
749 		do_uevent(ls, 0);
750 
751 	dlm_recoverd_stop(ls);
752 
753 	dlm_callback_stop(ls);
754 
755 	remove_lockspace(ls);
756 
757 	dlm_delete_debug_file(ls);
758 
759 	kfree(ls->ls_recover_buf);
760 
761 	/*
762 	 * Free direntry structs.
763 	 */
764 
765 	dlm_dir_clear(ls);
766 	vfree(ls->ls_dirtbl);
767 
768 	/*
769 	 * Free all lkb's in idr
770 	 */
771 
772 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
773 	idr_remove_all(&ls->ls_lkbidr);
774 	idr_destroy(&ls->ls_lkbidr);
775 
776 	/*
777 	 * Free all rsb's on rsbtbl[] lists
778 	 */
779 
780 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
781 		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
782 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
783 			rb_erase(n, &ls->ls_rsbtbl[i].keep);
784 			dlm_free_rsb(rsb);
785 		}
786 
787 		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
788 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
789 			rb_erase(n, &ls->ls_rsbtbl[i].toss);
790 			dlm_free_rsb(rsb);
791 		}
792 	}
793 
794 	vfree(ls->ls_rsbtbl);
795 
796 	while (!list_empty(&ls->ls_new_rsb)) {
797 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
798 				       res_hashchain);
799 		list_del(&rsb->res_hashchain);
800 		dlm_free_rsb(rsb);
801 	}
802 
803 	/*
804 	 * Free structures on any other lists
805 	 */
806 
807 	dlm_purge_requestqueue(ls);
808 	kfree(ls->ls_recover_args);
809 	dlm_clear_free_entries(ls);
810 	dlm_clear_members(ls);
811 	dlm_clear_members_gone(ls);
812 	kfree(ls->ls_node_array);
813 	log_debug(ls, "release_lockspace final free");
814 	kobject_put(&ls->ls_kobj);
815 	/* The ls structure will be freed when the kobject is done with */
816 
817 	module_put(THIS_MODULE);
818 	return 0;
819 }
820 
821 /*
822  * Called when a system has released all its locks and is not going to use the
823  * lockspace any longer.  We free everything we're managing for this lockspace.
824  * Remaining nodes will go through the recovery process as if we'd died.  The
825  * lockspace must continue to function as usual, participating in recoveries,
826  * until this returns.
827  *
828  * Force has 4 possible values:
829  * 0 - don't destroy locksapce if it has any LKBs
830  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
831  * 2 - destroy lockspace regardless of LKBs
832  * 3 - destroy lockspace as part of a forced shutdown
833  */
834 
dlm_release_lockspace(void * lockspace,int force)835 int dlm_release_lockspace(void *lockspace, int force)
836 {
837 	struct dlm_ls *ls;
838 	int error;
839 
840 	ls = dlm_find_lockspace_local(lockspace);
841 	if (!ls)
842 		return -EINVAL;
843 	dlm_put_lockspace(ls);
844 
845 	mutex_lock(&ls_lock);
846 	error = release_lockspace(ls, force);
847 	if (!error)
848 		ls_count--;
849 	if (!ls_count)
850 		threads_stop();
851 	mutex_unlock(&ls_lock);
852 
853 	return error;
854 }
855 
dlm_stop_lockspaces(void)856 void dlm_stop_lockspaces(void)
857 {
858 	struct dlm_ls *ls;
859 
860  restart:
861 	spin_lock(&lslist_lock);
862 	list_for_each_entry(ls, &lslist, ls_list) {
863 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
864 			continue;
865 		spin_unlock(&lslist_lock);
866 		log_error(ls, "no userland control daemon, stopping lockspace");
867 		dlm_ls_stop(ls);
868 		goto restart;
869 	}
870 	spin_unlock(&lslist_lock);
871 }
872 
873