Lines Matching refs:dlm

38 static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node);
41 static int dlm_do_recovery(struct dlm_ctxt *dlm);
43 static int dlm_pick_recovery_master(struct dlm_ctxt *dlm);
44 static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node);
45 static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node);
46 static int dlm_request_all_locks(struct dlm_ctxt *dlm,
48 static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm);
55 static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
60 static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
63 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm);
64 static int dlm_send_all_done_msg(struct dlm_ctxt *dlm,
66 static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node);
67 static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
69 static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
77 static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
100 static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm, in dlm_set_reco_dead_node() argument
103 assert_spin_locked(&dlm->spinlock); in dlm_set_reco_dead_node()
104 if (dlm->reco.dead_node != dead_node) in dlm_set_reco_dead_node()
106 dlm->name, dlm->reco.dead_node, dead_node); in dlm_set_reco_dead_node()
107 dlm->reco.dead_node = dead_node; in dlm_set_reco_dead_node()
110 static inline void dlm_set_reco_master(struct dlm_ctxt *dlm, in dlm_set_reco_master() argument
113 assert_spin_locked(&dlm->spinlock); in dlm_set_reco_master()
115 dlm->name, dlm->reco.new_master, master); in dlm_set_reco_master()
116 dlm->reco.new_master = master; in dlm_set_reco_master()
119 static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm) in __dlm_reset_recovery() argument
121 assert_spin_locked(&dlm->spinlock); in __dlm_reset_recovery()
122 clear_bit(dlm->reco.dead_node, dlm->recovery_map); in __dlm_reset_recovery()
123 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); in __dlm_reset_recovery()
124 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM); in __dlm_reset_recovery()
130 struct dlm_ctxt *dlm = in dlm_dispatch_work() local
137 spin_lock(&dlm->work_lock); in dlm_dispatch_work()
138 list_splice_init(&dlm->work_list, &tmp_list); in dlm_dispatch_work()
139 spin_unlock(&dlm->work_lock); in dlm_dispatch_work()
144 mlog(0, "%s: work thread has %d work items\n", dlm->name, tot); in dlm_dispatch_work()
152 BUG_ON(item->dlm != dlm); in dlm_dispatch_work()
158 dlm_put(dlm); in dlm_dispatch_work()
167 void dlm_kick_recovery_thread(struct dlm_ctxt *dlm) in dlm_kick_recovery_thread() argument
175 wake_up(&dlm->dlm_reco_thread_wq); in dlm_kick_recovery_thread()
179 int dlm_launch_recovery_thread(struct dlm_ctxt *dlm) in dlm_launch_recovery_thread() argument
183 dlm->dlm_reco_thread_task = kthread_run(dlm_recovery_thread, dlm, in dlm_launch_recovery_thread()
184 "dlm_reco-%s", dlm->name); in dlm_launch_recovery_thread()
185 if (IS_ERR(dlm->dlm_reco_thread_task)) { in dlm_launch_recovery_thread()
186 mlog_errno(PTR_ERR(dlm->dlm_reco_thread_task)); in dlm_launch_recovery_thread()
187 dlm->dlm_reco_thread_task = NULL; in dlm_launch_recovery_thread()
194 void dlm_complete_recovery_thread(struct dlm_ctxt *dlm) in dlm_complete_recovery_thread() argument
196 if (dlm->dlm_reco_thread_task) { in dlm_complete_recovery_thread()
198 kthread_stop(dlm->dlm_reco_thread_task); in dlm_complete_recovery_thread()
199 dlm->dlm_reco_thread_task = NULL; in dlm_complete_recovery_thread()
228 static void dlm_print_reco_node_status(struct dlm_ctxt *dlm) in dlm_print_reco_node_status() argument
234 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), in dlm_print_reco_node_status()
235 dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive", in dlm_print_reco_node_status()
236 dlm->reco.dead_node, dlm->reco.new_master); in dlm_print_reco_node_status()
238 list_for_each_entry(ndata, &dlm->reco.node_data, list) { in dlm_print_reco_node_status()
267 dlm->name, ndata->node_num, st); in dlm_print_reco_node_status()
269 list_for_each_entry(res, &dlm->reco.resources, recovering) { in dlm_print_reco_node_status()
271 dlm->name, res->lockname.len, res->lockname.name); in dlm_print_reco_node_status()
280 struct dlm_ctxt *dlm = data; in dlm_recovery_thread() local
283 mlog(0, "dlm thread running for %s...\n", dlm->name); in dlm_recovery_thread()
286 if (dlm_domain_fully_joined(dlm)) { in dlm_recovery_thread()
287 status = dlm_do_recovery(dlm); in dlm_recovery_thread()
296 wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq, in dlm_recovery_thread()
306 static int dlm_reco_master_ready(struct dlm_ctxt *dlm) in dlm_reco_master_ready() argument
309 spin_lock(&dlm->spinlock); in dlm_reco_master_ready()
310 ready = (dlm->reco.new_master != O2NM_INVALID_NODE_NUM); in dlm_reco_master_ready()
311 spin_unlock(&dlm->spinlock); in dlm_reco_master_ready()
317 int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node) in dlm_is_node_dead() argument
320 spin_lock(&dlm->spinlock); in dlm_is_node_dead()
321 dead = !test_bit(node, dlm->domain_map); in dlm_is_node_dead()
322 spin_unlock(&dlm->spinlock); in dlm_is_node_dead()
328 static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node) in dlm_is_node_recovered() argument
331 spin_lock(&dlm->spinlock); in dlm_is_node_recovered()
332 recovered = !test_bit(node, dlm->recovery_map); in dlm_is_node_recovered()
333 spin_unlock(&dlm->spinlock); in dlm_is_node_recovered()
338 void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) in dlm_wait_for_node_death() argument
340 if (dlm_is_node_dead(dlm, node)) in dlm_wait_for_node_death()
344 "domain %s\n", node, dlm->name); in dlm_wait_for_node_death()
347 wait_event_timeout(dlm->dlm_reco_thread_wq, in dlm_wait_for_node_death()
348 dlm_is_node_dead(dlm, node), in dlm_wait_for_node_death()
351 wait_event(dlm->dlm_reco_thread_wq, in dlm_wait_for_node_death()
352 dlm_is_node_dead(dlm, node)); in dlm_wait_for_node_death()
355 void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout) in dlm_wait_for_node_recovery() argument
357 if (dlm_is_node_recovered(dlm, node)) in dlm_wait_for_node_recovery()
361 "domain %s\n", node, dlm->name); in dlm_wait_for_node_recovery()
364 wait_event_timeout(dlm->dlm_reco_thread_wq, in dlm_wait_for_node_recovery()
365 dlm_is_node_recovered(dlm, node), in dlm_wait_for_node_recovery()
368 wait_event(dlm->dlm_reco_thread_wq, in dlm_wait_for_node_recovery()
369 dlm_is_node_recovered(dlm, node)); in dlm_wait_for_node_recovery()
378 static int dlm_in_recovery(struct dlm_ctxt *dlm) in dlm_in_recovery() argument
381 spin_lock(&dlm->spinlock); in dlm_in_recovery()
382 in_recovery = !!(dlm->reco.state & DLM_RECO_STATE_ACTIVE); in dlm_in_recovery()
383 spin_unlock(&dlm->spinlock); in dlm_in_recovery()
388 void dlm_wait_for_recovery(struct dlm_ctxt *dlm) in dlm_wait_for_recovery() argument
390 if (dlm_in_recovery(dlm)) { in dlm_wait_for_recovery()
393 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), in dlm_wait_for_recovery()
394 dlm->reco.state, dlm->reco.new_master, in dlm_wait_for_recovery()
395 dlm->reco.dead_node); in dlm_wait_for_recovery()
397 wait_event(dlm->reco.event, !dlm_in_recovery(dlm)); in dlm_wait_for_recovery()
400 static void dlm_begin_recovery(struct dlm_ctxt *dlm) in dlm_begin_recovery() argument
402 assert_spin_locked(&dlm->spinlock); in dlm_begin_recovery()
403 BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE); in dlm_begin_recovery()
405 dlm->name, dlm->reco.dead_node); in dlm_begin_recovery()
406 dlm->reco.state |= DLM_RECO_STATE_ACTIVE; in dlm_begin_recovery()
409 static void dlm_end_recovery(struct dlm_ctxt *dlm) in dlm_end_recovery() argument
411 spin_lock(&dlm->spinlock); in dlm_end_recovery()
412 BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE)); in dlm_end_recovery()
413 dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE; in dlm_end_recovery()
414 spin_unlock(&dlm->spinlock); in dlm_end_recovery()
415 printk(KERN_NOTICE "o2dlm: End recovery on domain %s\n", dlm->name); in dlm_end_recovery()
416 wake_up(&dlm->reco.event); in dlm_end_recovery()
419 static void dlm_print_recovery_master(struct dlm_ctxt *dlm) in dlm_print_recovery_master() argument
422 "dead node %u in domain %s\n", dlm->reco.new_master, in dlm_print_recovery_master()
423 (dlm->node_num == dlm->reco.new_master ? "me" : "he"), in dlm_print_recovery_master()
424 dlm->reco.dead_node, dlm->name); in dlm_print_recovery_master()
427 static int dlm_do_recovery(struct dlm_ctxt *dlm) in dlm_do_recovery() argument
432 spin_lock(&dlm->spinlock); in dlm_do_recovery()
434 if (dlm->migrate_done) { in dlm_do_recovery()
436 "lock resources\n", dlm->name); in dlm_do_recovery()
437 spin_unlock(&dlm->spinlock); in dlm_do_recovery()
442 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM && in dlm_do_recovery()
443 test_bit(dlm->reco.new_master, dlm->recovery_map)) { in dlm_do_recovery()
445 dlm->reco.new_master, dlm->reco.dead_node); in dlm_do_recovery()
447 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM); in dlm_do_recovery()
451 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { in dlm_do_recovery()
454 bit = find_first_bit(dlm->recovery_map, O2NM_MAX_NODES); in dlm_do_recovery()
456 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); in dlm_do_recovery()
458 dlm_set_reco_dead_node(dlm, bit); in dlm_do_recovery()
459 } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) { in dlm_do_recovery()
462 dlm->reco.dead_node); in dlm_do_recovery()
463 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); in dlm_do_recovery()
466 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { in dlm_do_recovery()
468 spin_unlock(&dlm->spinlock); in dlm_do_recovery()
473 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), in dlm_do_recovery()
474 dlm->reco.dead_node); in dlm_do_recovery()
478 dlm_begin_recovery(dlm); in dlm_do_recovery()
480 spin_unlock(&dlm->spinlock); in dlm_do_recovery()
482 if (dlm->reco.new_master == dlm->node_num) in dlm_do_recovery()
485 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { in dlm_do_recovery()
490 ret = dlm_pick_recovery_master(dlm); in dlm_do_recovery()
498 dlm_print_recovery_master(dlm); in dlm_do_recovery()
503 dlm_end_recovery(dlm); in dlm_do_recovery()
509 dlm_print_recovery_master(dlm); in dlm_do_recovery()
511 status = dlm_remaster_locks(dlm, dlm->reco.dead_node); in dlm_do_recovery()
515 "retrying.\n", dlm->name, status, dlm->reco.dead_node); in dlm_do_recovery()
522 dlm->name, dlm->reco.dead_node, dlm->node_num); in dlm_do_recovery()
523 spin_lock(&dlm->spinlock); in dlm_do_recovery()
524 __dlm_reset_recovery(dlm); in dlm_do_recovery()
525 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; in dlm_do_recovery()
526 spin_unlock(&dlm->spinlock); in dlm_do_recovery()
528 dlm_end_recovery(dlm); in dlm_do_recovery()
534 static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) in dlm_remaster_locks() argument
545 status = dlm_init_recovery_area(dlm, dead_node); in dlm_remaster_locks()
548 "retrying\n", dlm->name); in dlm_remaster_locks()
555 list_for_each_entry(ndata, &dlm->reco.node_data, list) { in dlm_remaster_locks()
559 mlog(0, "%s: Requesting lock info from node %u\n", dlm->name, in dlm_remaster_locks()
562 if (ndata->node_num == dlm->node_num) { in dlm_remaster_locks()
568 status = dlm_request_all_locks(dlm, ndata->node_num, in dlm_remaster_locks()
578 wait_event_timeout(dlm->dlm_reco_thread_wq, in dlm_remaster_locks()
579 dlm_is_node_dead(dlm, in dlm_remaster_locks()
584 dlm_is_node_dead(dlm, ndata->node_num) ? in dlm_remaster_locks()
591 dlm->name, ndata->node_num, in dlm_remaster_locks()
632 mlog(0, "%s: Done requesting all lock info\n", dlm->name); in dlm_remaster_locks()
642 list_for_each_entry(ndata, &dlm->reco.node_data, list) { in dlm_remaster_locks()
662 dlm->name, ndata->node_num, in dlm_remaster_locks()
669 dlm->name, ndata->node_num); in dlm_remaster_locks()
673 dlm->name, ndata->node_num); in dlm_remaster_locks()
688 spin_lock(&dlm->spinlock); in dlm_remaster_locks()
689 dlm->reco.state |= DLM_RECO_STATE_FINALIZE; in dlm_remaster_locks()
690 spin_unlock(&dlm->spinlock); in dlm_remaster_locks()
696 ret = dlm_send_finalize_reco_message(dlm); in dlm_remaster_locks()
700 spin_lock(&dlm->spinlock); in dlm_remaster_locks()
701 dlm_finish_local_lockres_recovery(dlm, dead_node, in dlm_remaster_locks()
702 dlm->node_num); in dlm_remaster_locks()
703 spin_unlock(&dlm->spinlock); in dlm_remaster_locks()
707 "dead=%u, this=%u, new=%u\n", dlm->name, in dlm_remaster_locks()
708 jiffies, dlm->reco.dead_node, in dlm_remaster_locks()
709 dlm->node_num, dlm->reco.new_master); in dlm_remaster_locks()
713 dlm_kick_thread(dlm, NULL); in dlm_remaster_locks()
718 wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq, in dlm_remaster_locks()
725 dlm_destroy_recovery_area(dlm); in dlm_remaster_locks()
730 static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node) in dlm_init_recovery_area() argument
735 spin_lock(&dlm->spinlock); in dlm_init_recovery_area()
736 memcpy(dlm->reco.node_map, dlm->domain_map, sizeof(dlm->domain_map)); in dlm_init_recovery_area()
739 spin_unlock(&dlm->spinlock); in dlm_init_recovery_area()
742 num = find_next_bit (dlm->reco.node_map, O2NM_MAX_NODES, num); in dlm_init_recovery_area()
750 dlm_destroy_recovery_area(dlm); in dlm_init_recovery_area()
756 list_add_tail(&ndata->list, &dlm->reco.node_data); in dlm_init_recovery_area()
764 static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm) in dlm_destroy_recovery_area() argument
770 list_splice_init(&dlm->reco.node_data, &tmplist); in dlm_destroy_recovery_area()
779 static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from, in dlm_request_all_locks() argument
793 lr.node_idx = dlm->node_num; in dlm_request_all_locks()
797 ret = o2net_send_message(DLM_LOCK_REQUEST_MSG, dlm->key, in dlm_request_all_locks()
803 "to recover dead node %u\n", dlm->name, ret, in dlm_request_all_locks()
816 struct dlm_ctxt *dlm = data; in dlm_request_all_locks_handler() local
821 if (!dlm_grab(dlm)) in dlm_request_all_locks_handler()
824 if (lr->dead_node != dlm->reco.dead_node) { in dlm_request_all_locks_handler()
826 "dead_node is %u\n", dlm->name, lr->node_idx, in dlm_request_all_locks_handler()
827 lr->dead_node, dlm->reco.dead_node); in dlm_request_all_locks_handler()
828 dlm_print_reco_node_status(dlm); in dlm_request_all_locks_handler()
830 dlm_put(dlm); in dlm_request_all_locks_handler()
833 BUG_ON(lr->dead_node != dlm->reco.dead_node); in dlm_request_all_locks_handler()
837 dlm_put(dlm); in dlm_request_all_locks_handler()
845 dlm_put(dlm); in dlm_request_all_locks_handler()
850 dlm_grab(dlm); /* get an extra ref for the work item */ in dlm_request_all_locks_handler()
851 dlm_init_work_item(dlm, item, dlm_request_all_locks_worker, buf); in dlm_request_all_locks_handler()
854 spin_lock(&dlm->work_lock); in dlm_request_all_locks_handler()
855 list_add_tail(&item->list, &dlm->work_list); in dlm_request_all_locks_handler()
856 spin_unlock(&dlm->work_lock); in dlm_request_all_locks_handler()
857 queue_work(dlm->dlm_worker, &dlm->dispatched_work); in dlm_request_all_locks_handler()
859 dlm_put(dlm); in dlm_request_all_locks_handler()
867 struct dlm_ctxt *dlm; in dlm_request_all_locks_worker() local
873 dlm = item->dlm; in dlm_request_all_locks_worker()
879 dlm->name, dead_node, reco_master); in dlm_request_all_locks_worker()
881 if (dead_node != dlm->reco.dead_node || in dlm_request_all_locks_worker()
882 reco_master != dlm->reco.new_master) { in dlm_request_all_locks_worker()
885 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { in dlm_request_all_locks_worker()
888 " current=(dead=%u,mas=%u)\n", dlm->name, in dlm_request_all_locks_worker()
890 dlm->reco.dead_node, dlm->reco.new_master); in dlm_request_all_locks_worker()
894 dlm->name, dlm->reco.dead_node, in dlm_request_all_locks_worker()
895 dlm->reco.new_master, dead_node, reco_master); in dlm_request_all_locks_worker()
906 dlm_move_reco_locks_to_list(dlm, &resources, dead_node); in dlm_request_all_locks_worker()
913 ret = dlm_send_one_lockres(dlm, res, mres, reco_master, in dlm_request_all_locks_worker()
917 "recovery state for dead node %u, ret=%d\n", dlm->name, in dlm_request_all_locks_worker()
925 spin_lock(&dlm->spinlock); in dlm_request_all_locks_worker()
926 list_splice_init(&resources, &dlm->reco.resources); in dlm_request_all_locks_worker()
927 spin_unlock(&dlm->spinlock); in dlm_request_all_locks_worker()
930 ret = dlm_send_all_done_msg(dlm, dead_node, reco_master); in dlm_request_all_locks_worker()
934 dlm->name, reco_master, dead_node, ret); in dlm_request_all_locks_worker()
942 static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to) in dlm_send_all_done_msg() argument
948 done_msg.node_idx = dlm->node_num; in dlm_send_all_done_msg()
954 ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, in dlm_send_all_done_msg()
958 "to recover dead node %u\n", dlm->name, ret, send_to, in dlm_send_all_done_msg()
972 struct dlm_ctxt *dlm = data; in dlm_reco_data_done_handler() local
977 if (!dlm_grab(dlm)) in dlm_reco_data_done_handler()
982 dlm->reco.dead_node, done->node_idx, dlm->node_num); in dlm_reco_data_done_handler()
984 mlog_bug_on_msg((done->dead_node != dlm->reco.dead_node), in dlm_reco_data_done_handler()
987 dlm->reco.dead_node, done->node_idx, dlm->node_num); in dlm_reco_data_done_handler()
990 list_for_each_entry(ndata, &dlm->reco.node_data, list) { in dlm_reco_data_done_handler()
1023 dlm_kick_recovery_thread(dlm); in dlm_reco_data_done_handler()
1028 dlm_put(dlm); in dlm_reco_data_done_handler()
1034 static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm, in dlm_move_reco_locks_to_list() argument
1041 spin_lock(&dlm->spinlock); in dlm_move_reco_locks_to_list()
1042 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { in dlm_move_reco_locks_to_list()
1053 dead_node, dlm->name); in dlm_move_reco_locks_to_list()
1077 spin_unlock(&dlm->spinlock); in dlm_move_reco_locks_to_list()
1095 static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm, in dlm_send_mig_lockres_msg() argument
1118 dlm->name, res->lockname.len, res->lockname.name, in dlm_send_mig_lockres_msg()
1123 ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres, in dlm_send_mig_lockres_msg()
1130 "node %u (%s)\n", dlm->name, mres->lockname_len, in dlm_send_mig_lockres_msg()
1231 static void dlm_add_dummy_lock(struct dlm_ctxt *dlm, in dlm_add_dummy_lock() argument
1241 dummy.ml.node = dlm->node_num; in dlm_add_dummy_lock()
1245 static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm, in dlm_is_dummy_lock() argument
1260 int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, in dlm_send_one_lockres() argument
1298 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, in dlm_send_one_lockres()
1307 dlm->name, res->lockname.len, res->lockname.name, in dlm_send_one_lockres()
1310 dlm_add_dummy_lock(dlm, mres); in dlm_send_one_lockres()
1313 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); in dlm_send_one_lockres()
1320 dlm->name, ret); in dlm_send_one_lockres()
1324 "lockres %.*s\n", dlm->name, send_to, in dlm_send_one_lockres()
1348 struct dlm_ctxt *dlm = data; in dlm_mig_lockres_handler() local
1359 if (!dlm_grab(dlm)) in dlm_mig_lockres_handler()
1362 if (!dlm_joined(dlm)) { in dlm_mig_lockres_handler()
1365 dlm->name, mres->lockname_len, in dlm_mig_lockres_handler()
1367 dlm_put(dlm); in dlm_mig_lockres_handler()
1395 spin_lock(&dlm->spinlock); in dlm_mig_lockres_handler()
1396 res = __dlm_lookup_lockres_full(dlm, mres->lockname, mres->lockname_len, in dlm_mig_lockres_handler()
1405 " ref!\n", dlm->name, in dlm_mig_lockres_handler()
1409 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1430 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1437 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1439 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1442 res = dlm_new_lockres(dlm, mres->lockname, mres->lockname_len); in dlm_mig_lockres_handler()
1456 spin_lock(&dlm->spinlock); in dlm_mig_lockres_handler()
1457 __dlm_insert_lockres(dlm, res); in dlm_mig_lockres_handler()
1458 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1491 dlm_lockres_grab_inflight_ref(dlm, res); in dlm_mig_lockres_handler()
1501 dlm_change_lockres_owner(dlm, res, dlm->node_num); in dlm_mig_lockres_handler()
1506 dlm_grab(dlm); /* get an extra ref for the work item */ in dlm_mig_lockres_handler()
1508 dlm_init_work_item(dlm, item, dlm_mig_lockres_worker, buf); in dlm_mig_lockres_handler()
1512 spin_lock(&dlm->work_lock); in dlm_mig_lockres_handler()
1513 list_add_tail(&item->list, &dlm->work_list); in dlm_mig_lockres_handler()
1514 spin_unlock(&dlm->work_lock); in dlm_mig_lockres_handler()
1515 queue_work(dlm->dlm_worker, &dlm->dispatched_work); in dlm_mig_lockres_handler()
1522 dlm_put(dlm); in dlm_mig_lockres_handler()
1535 struct dlm_ctxt *dlm; in dlm_mig_lockres_worker() local
1542 dlm = item->dlm; in dlm_mig_lockres_worker()
1553 ret = dlm_lockres_master_requery(dlm, res, &real_master); in dlm_mig_lockres_worker()
1565 dlm_lockres_drop_inflight_ref(dlm, res); in dlm_mig_lockres_worker()
1576 ret = dlm_process_recovery_data(dlm, res, mres); in dlm_mig_lockres_worker()
1584 ret = dlm_finish_migration(dlm, res, mres->master); in dlm_mig_lockres_worker()
1601 static int dlm_lockres_master_requery(struct dlm_ctxt *dlm, in dlm_lockres_master_requery() argument
1634 spin_lock(&dlm->spinlock); in dlm_lockres_master_requery()
1635 dlm_node_iter_init(dlm->domain_map, &iter); in dlm_lockres_master_requery()
1636 spin_unlock(&dlm->spinlock); in dlm_lockres_master_requery()
1640 if (nodenum == dlm->node_num) in dlm_lockres_master_requery()
1642 ret = dlm_do_master_requery(dlm, res, nodenum, real_master); in dlm_lockres_master_requery()
1659 int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, in dlm_do_master_requery() argument
1667 req.node_idx = dlm->node_num; in dlm_do_master_requery()
1672 ret = o2net_send_message(DLM_MASTER_REQUERY_MSG, dlm->key, in dlm_do_master_requery()
1677 dlm->key, nodenum); in dlm_do_master_requery()
1700 struct dlm_ctxt *dlm = data; in dlm_master_requery_handler() local
1708 if (!dlm_grab(dlm)) { in dlm_master_requery_handler()
1716 spin_lock(&dlm->spinlock); in dlm_master_requery_handler()
1717 res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash); in dlm_master_requery_handler()
1721 if (master == dlm->node_num) { in dlm_master_requery_handler()
1722 int ret = dlm_dispatch_assert_master(dlm, res, in dlm_master_requery_handler()
1728 spin_unlock(&dlm->spinlock); in dlm_master_requery_handler()
1729 dlm_put(dlm); in dlm_master_requery_handler()
1734 __dlm_lockres_grab_inflight_worker(dlm, res); in dlm_master_requery_handler()
1743 spin_unlock(&dlm->spinlock); in dlm_master_requery_handler()
1746 dlm_put(dlm); in dlm_master_requery_handler()
1787 static int dlm_process_recovery_data(struct dlm_ctxt *dlm, in dlm_process_recovery_data() argument
1806 if (dlm_is_dummy_lock(dlm, ml, &from)) { in dlm_process_recovery_data()
1810 dlm->name, mres->lockname_len, mres->lockname, in dlm_process_recovery_data()
1813 dlm_lockres_set_refmap_bit(dlm, res, from); in dlm_process_recovery_data()
1827 if (ml->node == dlm->node_num) { in dlm_process_recovery_data()
1955 "lvb! type=%d\n", dlm->name, in dlm_process_recovery_data()
1995 "exists on this lockres!\n", dlm->name, in dlm_process_recovery_data()
2024 "setting refmap bit\n", dlm->name, in dlm_process_recovery_data()
2026 dlm_lockres_set_refmap_bit(dlm, res, ml->node); in dlm_process_recovery_data()
2035 dlm_lockres_drop_inflight_ref(dlm, res); in dlm_process_recovery_data()
2044 void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm, in dlm_move_lockres_to_recovery_list() argument
2051 assert_spin_locked(&dlm->spinlock); in dlm_move_lockres_to_recovery_list()
2057 dlm->name, res->lockname.len, res->lockname.name); in dlm_move_lockres_to_recovery_list()
2063 list_add_tail(&res->recovering, &dlm->reco.resources); in dlm_move_lockres_to_recovery_list()
2128 static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, in dlm_finish_local_lockres_recovery() argument
2135 assert_spin_locked(&dlm->spinlock); in dlm_finish_local_lockres_recovery()
2137 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { in dlm_finish_local_lockres_recovery()
2140 dlm->name, res->lockname.len, res->lockname.name, in dlm_finish_local_lockres_recovery()
2146 dlm_change_lockres_owner(dlm, res, new_master); in dlm_finish_local_lockres_recovery()
2149 __dlm_dirty_lockres(dlm, res); in dlm_finish_local_lockres_recovery()
2161 bucket = dlm_lockres_hash(dlm, i); in dlm_finish_local_lockres_recovery()
2174 res->owner != dlm->node_num) in dlm_finish_local_lockres_recovery()
2185 dlm->name, res->lockname.len, res->lockname.name, in dlm_finish_local_lockres_recovery()
2188 dlm_change_lockres_owner(dlm, res, new_master); in dlm_finish_local_lockres_recovery()
2191 __dlm_dirty_lockres(dlm, res); in dlm_finish_local_lockres_recovery()
2209 static void dlm_revalidate_lvb(struct dlm_ctxt *dlm, in dlm_revalidate_lvb() argument
2218 assert_spin_locked(&dlm->spinlock); in dlm_revalidate_lvb()
2221 if (res->owner == dlm->node_num) in dlm_revalidate_lvb()
2228 search_node = dlm->node_num; in dlm_revalidate_lvb()
2252 static void dlm_free_dead_locks(struct dlm_ctxt *dlm, in dlm_free_dead_locks() argument
2262 assert_spin_locked(&dlm->spinlock); in dlm_free_dead_locks()
2299 "dropping ref from lockres\n", dlm->name, in dlm_free_dead_locks()
2303 "but ref was not set\n", dlm->name, in dlm_free_dead_locks()
2308 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); in dlm_free_dead_locks()
2311 "no locks and had not purged before dying\n", dlm->name, in dlm_free_dead_locks()
2313 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); in dlm_free_dead_locks()
2317 __dlm_dirty_lockres(dlm, res); in dlm_free_dead_locks()
2320 static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) in dlm_do_local_recovery_cleanup() argument
2330 dlm_clean_master_list(dlm, dead_node); in dlm_do_local_recovery_cleanup()
2347 bucket = dlm_lockres_hash(dlm, i); in dlm_do_local_recovery_cleanup()
2359 dead_node, dlm->name); in dlm_do_local_recovery_cleanup()
2373 __dlm_do_purge_lockres(dlm, res); in dlm_do_local_recovery_cleanup()
2378 } else if (res->owner == dlm->node_num) in dlm_do_local_recovery_cleanup()
2379 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); in dlm_do_local_recovery_cleanup()
2385 dlm_revalidate_lvb(dlm, res, dead_node); in dlm_do_local_recovery_cleanup()
2392 dlm->name, res->lockname.len, in dlm_do_local_recovery_cleanup()
2395 __dlm_do_purge_lockres(dlm, res); in dlm_do_local_recovery_cleanup()
2401 dlm_move_lockres_to_recovery_list(dlm, res); in dlm_do_local_recovery_cleanup()
2402 } else if (res->owner == dlm->node_num) { in dlm_do_local_recovery_cleanup()
2403 dlm_free_dead_locks(dlm, res, dead_node); in dlm_do_local_recovery_cleanup()
2404 __dlm_lockres_calc_usage(dlm, res); in dlm_do_local_recovery_cleanup()
2409 dlm->name, res->lockname.len, in dlm_do_local_recovery_cleanup()
2411 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); in dlm_do_local_recovery_cleanup()
2420 static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx) in __dlm_hb_node_down() argument
2422 assert_spin_locked(&dlm->spinlock); in __dlm_hb_node_down()
2424 if (dlm->reco.new_master == idx) { in __dlm_hb_node_down()
2426 dlm->name, idx); in __dlm_hb_node_down()
2427 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { in __dlm_hb_node_down()
2432 "finalize1 state, clearing\n", dlm->name, idx); in __dlm_hb_node_down()
2433 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; in __dlm_hb_node_down()
2434 __dlm_reset_recovery(dlm); in __dlm_hb_node_down()
2439 if (dlm->joining_node == idx) { in __dlm_hb_node_down()
2441 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); in __dlm_hb_node_down()
2445 if (!test_bit(idx, dlm->live_nodes_map)) { in __dlm_hb_node_down()
2448 dlm->name, idx); in __dlm_hb_node_down()
2453 if (!test_bit(idx, dlm->domain_map)) { in __dlm_hb_node_down()
2460 clear_bit(idx, dlm->live_nodes_map); in __dlm_hb_node_down()
2463 if (!test_bit(idx, dlm->recovery_map)) in __dlm_hb_node_down()
2464 dlm_do_local_recovery_cleanup(dlm, idx); in __dlm_hb_node_down()
2467 dlm_hb_event_notify_attached(dlm, idx, 0); in __dlm_hb_node_down()
2470 clear_bit(idx, dlm->domain_map); in __dlm_hb_node_down()
2471 clear_bit(idx, dlm->exit_domain_map); in __dlm_hb_node_down()
2474 wake_up(&dlm->migration_wq); in __dlm_hb_node_down()
2476 set_bit(idx, dlm->recovery_map); in __dlm_hb_node_down()
2481 struct dlm_ctxt *dlm = data; in dlm_hb_node_down_cb() local
2483 if (!dlm_grab(dlm)) in dlm_hb_node_down_cb()
2490 if (test_bit(idx, dlm->domain_map)) in dlm_hb_node_down_cb()
2491 dlm_fire_domain_eviction_callbacks(dlm, idx); in dlm_hb_node_down_cb()
2493 spin_lock(&dlm->spinlock); in dlm_hb_node_down_cb()
2494 __dlm_hb_node_down(dlm, idx); in dlm_hb_node_down_cb()
2495 spin_unlock(&dlm->spinlock); in dlm_hb_node_down_cb()
2497 dlm_put(dlm); in dlm_hb_node_down_cb()
2502 struct dlm_ctxt *dlm = data; in dlm_hb_node_up_cb() local
2504 if (!dlm_grab(dlm)) in dlm_hb_node_up_cb()
2507 spin_lock(&dlm->spinlock); in dlm_hb_node_up_cb()
2508 set_bit(idx, dlm->live_nodes_map); in dlm_hb_node_up_cb()
2511 spin_unlock(&dlm->spinlock); in dlm_hb_node_up_cb()
2513 dlm_put(dlm); in dlm_hb_node_up_cb()
2518 struct dlm_ctxt *dlm = astdata; in dlm_reco_ast() local
2520 dlm->node_num, dlm->name); in dlm_reco_ast()
2524 struct dlm_ctxt *dlm = astdata; in dlm_reco_bast() local
2526 dlm->node_num, dlm->name); in dlm_reco_bast()
2545 static int dlm_pick_recovery_master(struct dlm_ctxt *dlm) in dlm_pick_recovery_master() argument
2552 dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num); in dlm_pick_recovery_master()
2556 ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY, in dlm_pick_recovery_master()
2558 dlm_reco_ast, dlm, dlm_reco_bast); in dlm_pick_recovery_master()
2561 dlm->name, ret, lksb.status); in dlm_pick_recovery_master()
2565 dlm->name, dlm->node_num); in dlm_pick_recovery_master()
2569 if (dlm_reco_master_ready(dlm)) { in dlm_pick_recovery_master()
2571 "do the recovery\n", dlm->name, in dlm_pick_recovery_master()
2572 dlm->reco.new_master); in dlm_pick_recovery_master()
2578 spin_lock(&dlm->spinlock); in dlm_pick_recovery_master()
2579 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { in dlm_pick_recovery_master()
2582 "node got recovered already\n", dlm->name); in dlm_pick_recovery_master()
2583 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { in dlm_pick_recovery_master()
2586 dlm->name, dlm->reco.new_master); in dlm_pick_recovery_master()
2590 spin_unlock(&dlm->spinlock); in dlm_pick_recovery_master()
2597 "begin_reco now\n", dlm->name, in dlm_pick_recovery_master()
2598 dlm->reco.dead_node, dlm->node_num); in dlm_pick_recovery_master()
2599 status = dlm_send_begin_reco_message(dlm, in dlm_pick_recovery_master()
2600 dlm->reco.dead_node); in dlm_pick_recovery_master()
2605 spin_lock(&dlm->spinlock); in dlm_pick_recovery_master()
2606 dlm_set_reco_master(dlm, dlm->node_num); in dlm_pick_recovery_master()
2607 spin_unlock(&dlm->spinlock); in dlm_pick_recovery_master()
2612 ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm); in dlm_pick_recovery_master()
2615 ret = dlmunlock(dlm, &lksb, LKM_CANCEL, dlm_reco_unlock_ast, dlm); in dlm_pick_recovery_master()
2628 dlm->name, dlm->node_num); in dlm_pick_recovery_master()
2632 wait_event_timeout(dlm->dlm_reco_thread_wq, in dlm_pick_recovery_master()
2633 dlm_reco_master_ready(dlm), in dlm_pick_recovery_master()
2635 if (!dlm_reco_master_ready(dlm)) { in dlm_pick_recovery_master()
2637 dlm->name); in dlm_pick_recovery_master()
2642 dlm->name, dlm->reco.new_master, dlm->reco.dead_node); in dlm_pick_recovery_master()
2646 dlm->name, dlm->node_num); in dlm_pick_recovery_master()
2653 "lksb.status=%s\n", dlm->name, dlm_errname(ret), in dlm_pick_recovery_master()
2655 res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME, in dlm_pick_recovery_master()
2669 static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node) in dlm_send_begin_reco_message() argument
2677 mlog(0, "%s: dead node is %u\n", dlm->name, dead_node); in dlm_send_begin_reco_message()
2679 spin_lock(&dlm->spinlock); in dlm_send_begin_reco_message()
2680 dlm_node_iter_init(dlm->domain_map, &iter); in dlm_send_begin_reco_message()
2681 spin_unlock(&dlm->spinlock); in dlm_send_begin_reco_message()
2686 br.node_idx = dlm->node_num; in dlm_send_begin_reco_message()
2696 if (nodenum == dlm->node_num) { in dlm_send_begin_reco_message()
2703 ret = o2net_send_message(DLM_BEGIN_RECO_MSG, dlm->key, in dlm_send_begin_reco_message()
2712 "begin reco msg (%d)\n", dlm->name, nodenum, ret); in dlm_send_begin_reco_message()
2724 "to complete, backoff for a bit\n", dlm->name, in dlm_send_begin_reco_message()
2736 "returned %d\n", dlm->name, nodenum, ret); in dlm_send_begin_reco_message()
2737 res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME, in dlm_send_begin_reco_message()
2758 struct dlm_ctxt *dlm = data; in dlm_begin_reco_handler() local
2762 if (!dlm_grab(dlm)) in dlm_begin_reco_handler()
2765 spin_lock(&dlm->spinlock); in dlm_begin_reco_handler()
2766 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { in dlm_begin_reco_handler()
2769 dlm->name, br->node_idx, br->dead_node, in dlm_begin_reco_handler()
2770 dlm->reco.dead_node, dlm->reco.new_master); in dlm_begin_reco_handler()
2771 spin_unlock(&dlm->spinlock); in dlm_begin_reco_handler()
2772 dlm_put(dlm); in dlm_begin_reco_handler()
2775 spin_unlock(&dlm->spinlock); in dlm_begin_reco_handler()
2778 dlm->name, br->node_idx, br->dead_node, in dlm_begin_reco_handler()
2779 dlm->reco.dead_node, dlm->reco.new_master); in dlm_begin_reco_handler()
2781 dlm_fire_domain_eviction_callbacks(dlm, br->dead_node); in dlm_begin_reco_handler()
2783 spin_lock(&dlm->spinlock); in dlm_begin_reco_handler()
2784 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { in dlm_begin_reco_handler()
2785 if (test_bit(dlm->reco.new_master, dlm->recovery_map)) { in dlm_begin_reco_handler()
2787 "to %u\n", dlm->name, dlm->reco.new_master, in dlm_begin_reco_handler()
2791 "to %u\n", dlm->name, dlm->reco.new_master, in dlm_begin_reco_handler()
2796 if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) { in dlm_begin_reco_handler()
2798 "node %u changing it to %u\n", dlm->name, in dlm_begin_reco_handler()
2799 dlm->reco.dead_node, br->node_idx, br->dead_node); in dlm_begin_reco_handler()
2801 dlm_set_reco_master(dlm, br->node_idx); in dlm_begin_reco_handler()
2802 dlm_set_reco_dead_node(dlm, br->dead_node); in dlm_begin_reco_handler()
2803 if (!test_bit(br->dead_node, dlm->recovery_map)) { in dlm_begin_reco_handler()
2807 if (!test_bit(br->dead_node, dlm->domain_map) || in dlm_begin_reco_handler()
2808 !test_bit(br->dead_node, dlm->live_nodes_map)) in dlm_begin_reco_handler()
2814 set_bit(br->dead_node, dlm->domain_map); in dlm_begin_reco_handler()
2815 set_bit(br->dead_node, dlm->live_nodes_map); in dlm_begin_reco_handler()
2816 __dlm_hb_node_down(dlm, br->dead_node); in dlm_begin_reco_handler()
2818 spin_unlock(&dlm->spinlock); in dlm_begin_reco_handler()
2820 dlm_kick_recovery_thread(dlm); in dlm_begin_reco_handler()
2823 dlm->name, br->node_idx, br->dead_node, in dlm_begin_reco_handler()
2824 dlm->reco.dead_node, dlm->reco.new_master); in dlm_begin_reco_handler()
2826 dlm_put(dlm); in dlm_begin_reco_handler()
2831 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm) in dlm_send_finalize_reco_message() argument
2841 "stage %d\n", dlm->name, dlm->reco.dead_node, stage); in dlm_send_finalize_reco_message()
2843 spin_lock(&dlm->spinlock); in dlm_send_finalize_reco_message()
2844 dlm_node_iter_init(dlm->domain_map, &iter); in dlm_send_finalize_reco_message()
2845 spin_unlock(&dlm->spinlock); in dlm_send_finalize_reco_message()
2849 fr.node_idx = dlm->node_num; in dlm_send_finalize_reco_message()
2850 fr.dead_node = dlm->reco.dead_node; in dlm_send_finalize_reco_message()
2855 if (nodenum == dlm->node_num) in dlm_send_finalize_reco_message()
2857 ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key, in dlm_send_finalize_reco_message()
2864 dlm->key, nodenum); in dlm_send_finalize_reco_message()
2890 struct dlm_ctxt *dlm = data; in dlm_finalize_reco_handler() local
2895 if (!dlm_grab(dlm)) in dlm_finalize_reco_handler()
2902 "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage, in dlm_finalize_reco_handler()
2903 fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master); in dlm_finalize_reco_handler()
2905 spin_lock(&dlm->spinlock); in dlm_finalize_reco_handler()
2907 if (dlm->reco.new_master != fr->node_idx) { in dlm_finalize_reco_handler()
2910 fr->node_idx, dlm->reco.new_master, fr->dead_node); in dlm_finalize_reco_handler()
2913 if (dlm->reco.dead_node != fr->dead_node) { in dlm_finalize_reco_handler()
2916 fr->node_idx, fr->dead_node, dlm->reco.dead_node); in dlm_finalize_reco_handler()
2922 dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx); in dlm_finalize_reco_handler()
2923 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { in dlm_finalize_reco_handler()
2927 dlm->name, fr->node_idx, fr->dead_node); in dlm_finalize_reco_handler()
2928 dlm_print_reco_node_status(dlm); in dlm_finalize_reco_handler()
2931 dlm->reco.state |= DLM_RECO_STATE_FINALIZE; in dlm_finalize_reco_handler()
2932 spin_unlock(&dlm->spinlock); in dlm_finalize_reco_handler()
2935 if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) { in dlm_finalize_reco_handler()
2939 dlm->name, fr->node_idx, fr->dead_node); in dlm_finalize_reco_handler()
2940 dlm_print_reco_node_status(dlm); in dlm_finalize_reco_handler()
2943 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; in dlm_finalize_reco_handler()
2944 __dlm_reset_recovery(dlm); in dlm_finalize_reco_handler()
2945 spin_unlock(&dlm->spinlock); in dlm_finalize_reco_handler()
2946 dlm_kick_recovery_thread(dlm); in dlm_finalize_reco_handler()
2951 dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master); in dlm_finalize_reco_handler()
2953 dlm_put(dlm); in dlm_finalize_reco_handler()