1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 ** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10
11 /* Central locking logic has four stages:
12
13 dlm_lock()
14 dlm_unlock()
15
16 request_lock(ls, lkb)
17 convert_lock(ls, lkb)
18 unlock_lock(ls, lkb)
19 cancel_lock(ls, lkb)
20
21 _request_lock(r, lkb)
22 _convert_lock(r, lkb)
23 _unlock_lock(r, lkb)
24 _cancel_lock(r, lkb)
25
26 do_request(r, lkb)
27 do_convert(r, lkb)
28 do_unlock(r, lkb)
29 do_cancel(r, lkb)
30
31 Stage 1 (lock, unlock) is mainly about checking input args and
32 splitting into one of the four main operations:
33
34 dlm_lock = request_lock
35 dlm_lock+CONVERT = convert_lock
36 dlm_unlock = unlock_lock
37 dlm_unlock+CANCEL = cancel_lock
38
39 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40 provided to the next stage.
41
42 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43 When remote, it calls send_xxxx(), when local it calls do_xxxx().
44
45 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
46 given rsb and lkb and queues callbacks.
47
48 For remote operations, send_xxxx() results in the corresponding do_xxxx()
49 function being executed on the remote node. The connecting send/receive
50 calls on local (L) and remote (R) nodes:
51
52 L: send_xxxx() -> R: receive_xxxx()
53 R: do_xxxx()
54 L: receive_xxxx_reply() <- R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
93 static void toss_rsb(struct kref *kref);
94
95 /*
96 * Lock compatibilty matrix - thanks Steve
97 * UN = Unlocked state. Not really a state, used as a flag
98 * PD = Padding. Used to make the matrix a nice power of two in size
99 * Other states are the same as the VMS DLM.
100 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
101 */
102
103 static const int __dlm_compat_matrix[8][8] = {
104 /* UN NL CR CW PR PW EX PD */
105 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
106 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
107 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
108 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
109 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
110 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
111 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
112 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
113 };
114
115 /*
116 * This defines the direction of transfer of LVB data.
117 * Granted mode is the row; requested mode is the column.
118 * Usage: matrix[grmode+1][rqmode+1]
119 * 1 = LVB is returned to the caller
120 * 0 = LVB is written to the resource
121 * -1 = nothing happens to the LVB
122 */
123
124 const int dlm_lvb_operations[8][8] = {
125 /* UN NL CR CW PR PW EX PD*/
126 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
127 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
128 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
129 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
130 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
131 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
132 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
133 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
134 };
135
136 #define modes_compat(gr, rq) \
137 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
138
dlm_modes_compat(int mode1,int mode2)139 int dlm_modes_compat(int mode1, int mode2)
140 {
141 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
142 }
143
144 /*
145 * Compatibility matrix for conversions with QUECVT set.
146 * Granted mode is the row; requested mode is the column.
147 * Usage: matrix[grmode+1][rqmode+1]
148 */
149
150 static const int __quecvt_compat_matrix[8][8] = {
151 /* UN NL CR CW PR PW EX PD */
152 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
153 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
154 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
155 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
156 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
157 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
158 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
159 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
160 };
161
dlm_print_lkb(struct dlm_lkb * lkb)162 void dlm_print_lkb(struct dlm_lkb *lkb)
163 {
164 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
165 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
166 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
167 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
168 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
169 (unsigned long long)lkb->lkb_recover_seq);
170 }
171
dlm_print_rsb(struct dlm_rsb * r)172 static void dlm_print_rsb(struct dlm_rsb *r)
173 {
174 printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
175 "rlc %d name %s\n",
176 r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
177 r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
178 r->res_name);
179 }
180
dlm_dump_rsb(struct dlm_rsb * r)181 void dlm_dump_rsb(struct dlm_rsb *r)
182 {
183 struct dlm_lkb *lkb;
184
185 dlm_print_rsb(r);
186
187 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
188 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
189 printk(KERN_ERR "rsb lookup list\n");
190 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
191 dlm_print_lkb(lkb);
192 printk(KERN_ERR "rsb grant queue:\n");
193 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
194 dlm_print_lkb(lkb);
195 printk(KERN_ERR "rsb convert queue:\n");
196 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
197 dlm_print_lkb(lkb);
198 printk(KERN_ERR "rsb wait queue:\n");
199 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
200 dlm_print_lkb(lkb);
201 }
202
203 /* Threads cannot use the lockspace while it's being recovered */
204
dlm_lock_recovery(struct dlm_ls * ls)205 static inline void dlm_lock_recovery(struct dlm_ls *ls)
206 {
207 down_read(&ls->ls_in_recovery);
208 }
209
dlm_unlock_recovery(struct dlm_ls * ls)210 void dlm_unlock_recovery(struct dlm_ls *ls)
211 {
212 up_read(&ls->ls_in_recovery);
213 }
214
dlm_lock_recovery_try(struct dlm_ls * ls)215 int dlm_lock_recovery_try(struct dlm_ls *ls)
216 {
217 return down_read_trylock(&ls->ls_in_recovery);
218 }
219
can_be_queued(struct dlm_lkb * lkb)220 static inline int can_be_queued(struct dlm_lkb *lkb)
221 {
222 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
223 }
224
force_blocking_asts(struct dlm_lkb * lkb)225 static inline int force_blocking_asts(struct dlm_lkb *lkb)
226 {
227 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
228 }
229
is_demoted(struct dlm_lkb * lkb)230 static inline int is_demoted(struct dlm_lkb *lkb)
231 {
232 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
233 }
234
is_altmode(struct dlm_lkb * lkb)235 static inline int is_altmode(struct dlm_lkb *lkb)
236 {
237 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
238 }
239
is_granted(struct dlm_lkb * lkb)240 static inline int is_granted(struct dlm_lkb *lkb)
241 {
242 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
243 }
244
is_remote(struct dlm_rsb * r)245 static inline int is_remote(struct dlm_rsb *r)
246 {
247 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
248 return !!r->res_nodeid;
249 }
250
is_process_copy(struct dlm_lkb * lkb)251 static inline int is_process_copy(struct dlm_lkb *lkb)
252 {
253 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
254 }
255
is_master_copy(struct dlm_lkb * lkb)256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
259 }
260
middle_conversion(struct dlm_lkb * lkb)261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265 return 1;
266 return 0;
267 }
268
down_conversion(struct dlm_lkb * lkb)269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273
is_overlap_unlock(struct dlm_lkb * lkb)274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
277 }
278
is_overlap_cancel(struct dlm_lkb * lkb)279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
282 }
283
is_overlap(struct dlm_lkb * lkb)284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
287 DLM_IFL_OVERLAP_CANCEL));
288 }
289
queue_cast(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292 if (is_master_copy(lkb))
293 return;
294
295 del_timeout(lkb);
296
297 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
298
299 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
300 timeout caused the cancel then return -ETIMEDOUT */
301 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
302 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
303 rv = -ETIMEDOUT;
304 }
305
306 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
307 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
308 rv = -EDEADLK;
309 }
310
311 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
312 }
313
queue_cast_overlap(struct dlm_rsb * r,struct dlm_lkb * lkb)314 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
315 {
316 queue_cast(r, lkb,
317 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
318 }
319
queue_bast(struct dlm_rsb * r,struct dlm_lkb * lkb,int rqmode)320 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
321 {
322 if (is_master_copy(lkb)) {
323 send_bast(r, lkb, rqmode);
324 } else {
325 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
326 }
327 }
328
329 /*
330 * Basic operations on rsb's and lkb's
331 */
332
333 /* This is only called to add a reference when the code already holds
334 a valid reference to the rsb, so there's no need for locking. */
335
hold_rsb(struct dlm_rsb * r)336 static inline void hold_rsb(struct dlm_rsb *r)
337 {
338 kref_get(&r->res_ref);
339 }
340
dlm_hold_rsb(struct dlm_rsb * r)341 void dlm_hold_rsb(struct dlm_rsb *r)
342 {
343 hold_rsb(r);
344 }
345
346 /* When all references to the rsb are gone it's transferred to
347 the tossed list for later disposal. */
348
put_rsb(struct dlm_rsb * r)349 static void put_rsb(struct dlm_rsb *r)
350 {
351 struct dlm_ls *ls = r->res_ls;
352 uint32_t bucket = r->res_bucket;
353 int rv;
354
355 rv = kref_put_lock(&r->res_ref, toss_rsb,
356 &ls->ls_rsbtbl[bucket].lock);
357 if (rv)
358 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
359 }
360
dlm_put_rsb(struct dlm_rsb * r)361 void dlm_put_rsb(struct dlm_rsb *r)
362 {
363 put_rsb(r);
364 }
365
pre_rsb_struct(struct dlm_ls * ls)366 static int pre_rsb_struct(struct dlm_ls *ls)
367 {
368 struct dlm_rsb *r1, *r2;
369 int count = 0;
370
371 spin_lock(&ls->ls_new_rsb_spin);
372 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
373 spin_unlock(&ls->ls_new_rsb_spin);
374 return 0;
375 }
376 spin_unlock(&ls->ls_new_rsb_spin);
377
378 r1 = dlm_allocate_rsb(ls);
379 r2 = dlm_allocate_rsb(ls);
380
381 spin_lock(&ls->ls_new_rsb_spin);
382 if (r1) {
383 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
384 ls->ls_new_rsb_count++;
385 }
386 if (r2) {
387 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
388 ls->ls_new_rsb_count++;
389 }
390 count = ls->ls_new_rsb_count;
391 spin_unlock(&ls->ls_new_rsb_spin);
392
393 if (!count)
394 return -ENOMEM;
395 return 0;
396 }
397
398 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
399 unlock any spinlocks, go back and call pre_rsb_struct again.
400 Otherwise, take an rsb off the list and return it. */
401
get_rsb_struct(struct dlm_ls * ls,char * name,int len,struct dlm_rsb ** r_ret)402 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
403 struct dlm_rsb **r_ret)
404 {
405 struct dlm_rsb *r;
406 int count;
407
408 spin_lock(&ls->ls_new_rsb_spin);
409 if (list_empty(&ls->ls_new_rsb)) {
410 count = ls->ls_new_rsb_count;
411 spin_unlock(&ls->ls_new_rsb_spin);
412 log_debug(ls, "find_rsb retry %d %d %s",
413 count, dlm_config.ci_new_rsb_count, name);
414 return -EAGAIN;
415 }
416
417 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
418 list_del(&r->res_hashchain);
419 /* Convert the empty list_head to a NULL rb_node for tree usage: */
420 memset(&r->res_hashnode, 0, sizeof(struct rb_node));
421 ls->ls_new_rsb_count--;
422 spin_unlock(&ls->ls_new_rsb_spin);
423
424 r->res_ls = ls;
425 r->res_length = len;
426 memcpy(r->res_name, name, len);
427 mutex_init(&r->res_mutex);
428
429 INIT_LIST_HEAD(&r->res_lookup);
430 INIT_LIST_HEAD(&r->res_grantqueue);
431 INIT_LIST_HEAD(&r->res_convertqueue);
432 INIT_LIST_HEAD(&r->res_waitqueue);
433 INIT_LIST_HEAD(&r->res_root_list);
434 INIT_LIST_HEAD(&r->res_recover_list);
435
436 *r_ret = r;
437 return 0;
438 }
439
rsb_cmp(struct dlm_rsb * r,const char * name,int nlen)440 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
441 {
442 char maxname[DLM_RESNAME_MAXLEN];
443
444 memset(maxname, 0, DLM_RESNAME_MAXLEN);
445 memcpy(maxname, name, nlen);
446 return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
447 }
448
dlm_search_rsb_tree(struct rb_root * tree,char * name,int len,struct dlm_rsb ** r_ret)449 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
450 struct dlm_rsb **r_ret)
451 {
452 struct rb_node *node = tree->rb_node;
453 struct dlm_rsb *r;
454 int rc;
455
456 while (node) {
457 r = rb_entry(node, struct dlm_rsb, res_hashnode);
458 rc = rsb_cmp(r, name, len);
459 if (rc < 0)
460 node = node->rb_left;
461 else if (rc > 0)
462 node = node->rb_right;
463 else
464 goto found;
465 }
466 *r_ret = NULL;
467 return -EBADR;
468
469 found:
470 *r_ret = r;
471 return 0;
472 }
473
rsb_insert(struct dlm_rsb * rsb,struct rb_root * tree)474 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
475 {
476 struct rb_node **newn = &tree->rb_node;
477 struct rb_node *parent = NULL;
478 int rc;
479
480 while (*newn) {
481 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
482 res_hashnode);
483
484 parent = *newn;
485 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
486 if (rc < 0)
487 newn = &parent->rb_left;
488 else if (rc > 0)
489 newn = &parent->rb_right;
490 else {
491 log_print("rsb_insert match");
492 dlm_dump_rsb(rsb);
493 dlm_dump_rsb(cur);
494 return -EEXIST;
495 }
496 }
497
498 rb_link_node(&rsb->res_hashnode, parent, newn);
499 rb_insert_color(&rsb->res_hashnode, tree);
500 return 0;
501 }
502
503 /*
504 * Find rsb in rsbtbl and potentially create/add one
505 *
506 * Delaying the release of rsb's has a similar benefit to applications keeping
507 * NL locks on an rsb, but without the guarantee that the cached master value
508 * will still be valid when the rsb is reused. Apps aren't always smart enough
509 * to keep NL locks on an rsb that they may lock again shortly; this can lead
510 * to excessive master lookups and removals if we don't delay the release.
511 *
512 * Searching for an rsb means looking through both the normal list and toss
513 * list. When found on the toss list the rsb is moved to the normal list with
514 * ref count of 1; when found on normal list the ref count is incremented.
515 *
516 * rsb's on the keep list are being used locally and refcounted.
517 * rsb's on the toss list are not being used locally, and are not refcounted.
518 *
519 * The toss list rsb's were either
520 * - previously used locally but not any more (were on keep list, then
521 * moved to toss list when last refcount dropped)
522 * - created and put on toss list as a directory record for a lookup
523 * (we are the dir node for the res, but are not using the res right now,
524 * but some other node is)
525 *
526 * The purpose of find_rsb() is to return a refcounted rsb for local use.
527 * So, if the given rsb is on the toss list, it is moved to the keep list
528 * before being returned.
529 *
530 * toss_rsb() happens when all local usage of the rsb is done, i.e. no
531 * more refcounts exist, so the rsb is moved from the keep list to the
532 * toss list.
533 *
534 * rsb's on both keep and toss lists are used for doing a name to master
535 * lookups. rsb's that are in use locally (and being refcounted) are on
536 * the keep list, rsb's that are not in use locally (not refcounted) and
537 * only exist for name/master lookups are on the toss list.
538 *
539 * rsb's on the toss list who's dir_nodeid is not local can have stale
540 * name/master mappings. So, remote requests on such rsb's can potentially
541 * return with an error, which means the mapping is stale and needs to
542 * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and
543 * first_lkid is to keep only a single outstanding request on an rsb
544 * while that rsb has a potentially stale master.)
545 */
546
find_rsb_dir(struct dlm_ls * ls,char * name,int len,uint32_t hash,uint32_t b,int dir_nodeid,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)547 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
548 uint32_t hash, uint32_t b,
549 int dir_nodeid, int from_nodeid,
550 unsigned int flags, struct dlm_rsb **r_ret)
551 {
552 struct dlm_rsb *r = NULL;
553 int our_nodeid = dlm_our_nodeid();
554 int from_local = 0;
555 int from_other = 0;
556 int from_dir = 0;
557 int create = 0;
558 int error;
559
560 if (flags & R_RECEIVE_REQUEST) {
561 if (from_nodeid == dir_nodeid)
562 from_dir = 1;
563 else
564 from_other = 1;
565 } else if (flags & R_REQUEST) {
566 from_local = 1;
567 }
568
569 /*
570 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
571 * from_nodeid has sent us a lock in dlm_recover_locks, believing
572 * we're the new master. Our local recovery may not have set
573 * res_master_nodeid to our_nodeid yet, so allow either. Don't
574 * create the rsb; dlm_recover_process_copy() will handle EBADR
575 * by resending.
576 *
577 * If someone sends us a request, we are the dir node, and we do
578 * not find the rsb anywhere, then recreate it. This happens if
579 * someone sends us a request after we have removed/freed an rsb
580 * from our toss list. (They sent a request instead of lookup
581 * because they are using an rsb from their toss list.)
582 */
583
584 if (from_local || from_dir ||
585 (from_other && (dir_nodeid == our_nodeid))) {
586 create = 1;
587 }
588
589 retry:
590 if (create) {
591 error = pre_rsb_struct(ls);
592 if (error < 0)
593 goto out;
594 }
595
596 spin_lock(&ls->ls_rsbtbl[b].lock);
597
598 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
599 if (error)
600 goto do_toss;
601
602 /*
603 * rsb is active, so we can't check master_nodeid without lock_rsb.
604 */
605
606 kref_get(&r->res_ref);
607 goto out_unlock;
608
609
610 do_toss:
611 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
612 if (error)
613 goto do_new;
614
615 /*
616 * rsb found inactive (master_nodeid may be out of date unless
617 * we are the dir_nodeid or were the master) No other thread
618 * is using this rsb because it's on the toss list, so we can
619 * look at or update res_master_nodeid without lock_rsb.
620 */
621
622 if ((r->res_master_nodeid != our_nodeid) && from_other) {
623 /* our rsb was not master, and another node (not the dir node)
624 has sent us a request */
625 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
626 from_nodeid, r->res_master_nodeid, dir_nodeid,
627 r->res_name);
628 error = -ENOTBLK;
629 goto out_unlock;
630 }
631
632 if ((r->res_master_nodeid != our_nodeid) && from_dir) {
633 /* don't think this should ever happen */
634 log_error(ls, "find_rsb toss from_dir %d master %d",
635 from_nodeid, r->res_master_nodeid);
636 dlm_print_rsb(r);
637 /* fix it and go on */
638 r->res_master_nodeid = our_nodeid;
639 r->res_nodeid = 0;
640 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
641 r->res_first_lkid = 0;
642 }
643
644 if (from_local && (r->res_master_nodeid != our_nodeid)) {
645 /* Because we have held no locks on this rsb,
646 res_master_nodeid could have become stale. */
647 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
648 r->res_first_lkid = 0;
649 }
650
651 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
652 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
653 goto out_unlock;
654
655
656 do_new:
657 /*
658 * rsb not found
659 */
660
661 if (error == -EBADR && !create)
662 goto out_unlock;
663
664 error = get_rsb_struct(ls, name, len, &r);
665 if (error == -EAGAIN) {
666 spin_unlock(&ls->ls_rsbtbl[b].lock);
667 goto retry;
668 }
669 if (error)
670 goto out_unlock;
671
672 r->res_hash = hash;
673 r->res_bucket = b;
674 r->res_dir_nodeid = dir_nodeid;
675 kref_init(&r->res_ref);
676
677 if (from_dir) {
678 /* want to see how often this happens */
679 log_debug(ls, "find_rsb new from_dir %d recreate %s",
680 from_nodeid, r->res_name);
681 r->res_master_nodeid = our_nodeid;
682 r->res_nodeid = 0;
683 goto out_add;
684 }
685
686 if (from_other && (dir_nodeid != our_nodeid)) {
687 /* should never happen */
688 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
689 from_nodeid, dir_nodeid, our_nodeid, r->res_name);
690 dlm_free_rsb(r);
691 r = NULL;
692 error = -ENOTBLK;
693 goto out_unlock;
694 }
695
696 if (from_other) {
697 log_debug(ls, "find_rsb new from_other %d dir %d %s",
698 from_nodeid, dir_nodeid, r->res_name);
699 }
700
701 if (dir_nodeid == our_nodeid) {
702 /* When we are the dir nodeid, we can set the master
703 node immediately */
704 r->res_master_nodeid = our_nodeid;
705 r->res_nodeid = 0;
706 } else {
707 /* set_master will send_lookup to dir_nodeid */
708 r->res_master_nodeid = 0;
709 r->res_nodeid = -1;
710 }
711
712 out_add:
713 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
714 out_unlock:
715 spin_unlock(&ls->ls_rsbtbl[b].lock);
716 out:
717 *r_ret = r;
718 return error;
719 }
720
721 /* During recovery, other nodes can send us new MSTCPY locks (from
722 dlm_recover_locks) before we've made ourself master (in
723 dlm_recover_masters). */
724
find_rsb_nodir(struct dlm_ls * ls,char * name,int len,uint32_t hash,uint32_t b,int dir_nodeid,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)725 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
726 uint32_t hash, uint32_t b,
727 int dir_nodeid, int from_nodeid,
728 unsigned int flags, struct dlm_rsb **r_ret)
729 {
730 struct dlm_rsb *r = NULL;
731 int our_nodeid = dlm_our_nodeid();
732 int recover = (flags & R_RECEIVE_RECOVER);
733 int error;
734
735 retry:
736 error = pre_rsb_struct(ls);
737 if (error < 0)
738 goto out;
739
740 spin_lock(&ls->ls_rsbtbl[b].lock);
741
742 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
743 if (error)
744 goto do_toss;
745
746 /*
747 * rsb is active, so we can't check master_nodeid without lock_rsb.
748 */
749
750 kref_get(&r->res_ref);
751 goto out_unlock;
752
753
754 do_toss:
755 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
756 if (error)
757 goto do_new;
758
759 /*
760 * rsb found inactive. No other thread is using this rsb because
761 * it's on the toss list, so we can look at or update
762 * res_master_nodeid without lock_rsb.
763 */
764
765 if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
766 /* our rsb is not master, and another node has sent us a
767 request; this should never happen */
768 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
769 from_nodeid, r->res_master_nodeid, dir_nodeid);
770 dlm_print_rsb(r);
771 error = -ENOTBLK;
772 goto out_unlock;
773 }
774
775 if (!recover && (r->res_master_nodeid != our_nodeid) &&
776 (dir_nodeid == our_nodeid)) {
777 /* our rsb is not master, and we are dir; may as well fix it;
778 this should never happen */
779 log_error(ls, "find_rsb toss our %d master %d dir %d",
780 our_nodeid, r->res_master_nodeid, dir_nodeid);
781 dlm_print_rsb(r);
782 r->res_master_nodeid = our_nodeid;
783 r->res_nodeid = 0;
784 }
785
786 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
787 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
788 goto out_unlock;
789
790
791 do_new:
792 /*
793 * rsb not found
794 */
795
796 error = get_rsb_struct(ls, name, len, &r);
797 if (error == -EAGAIN) {
798 spin_unlock(&ls->ls_rsbtbl[b].lock);
799 goto retry;
800 }
801 if (error)
802 goto out_unlock;
803
804 r->res_hash = hash;
805 r->res_bucket = b;
806 r->res_dir_nodeid = dir_nodeid;
807 r->res_master_nodeid = dir_nodeid;
808 r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
809 kref_init(&r->res_ref);
810
811 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
812 out_unlock:
813 spin_unlock(&ls->ls_rsbtbl[b].lock);
814 out:
815 *r_ret = r;
816 return error;
817 }
818
find_rsb(struct dlm_ls * ls,char * name,int len,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)819 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
820 unsigned int flags, struct dlm_rsb **r_ret)
821 {
822 uint32_t hash, b;
823 int dir_nodeid;
824
825 if (len > DLM_RESNAME_MAXLEN)
826 return -EINVAL;
827
828 hash = jhash(name, len, 0);
829 b = hash & (ls->ls_rsbtbl_size - 1);
830
831 dir_nodeid = dlm_hash2nodeid(ls, hash);
832
833 if (dlm_no_directory(ls))
834 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
835 from_nodeid, flags, r_ret);
836 else
837 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
838 from_nodeid, flags, r_ret);
839 }
840
841 /* we have received a request and found that res_master_nodeid != our_nodeid,
842 so we need to return an error or make ourself the master */
843
validate_master_nodeid(struct dlm_ls * ls,struct dlm_rsb * r,int from_nodeid)844 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
845 int from_nodeid)
846 {
847 if (dlm_no_directory(ls)) {
848 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
849 from_nodeid, r->res_master_nodeid,
850 r->res_dir_nodeid);
851 dlm_print_rsb(r);
852 return -ENOTBLK;
853 }
854
855 if (from_nodeid != r->res_dir_nodeid) {
856 /* our rsb is not master, and another node (not the dir node)
857 has sent us a request. this is much more common when our
858 master_nodeid is zero, so limit debug to non-zero. */
859
860 if (r->res_master_nodeid) {
861 log_debug(ls, "validate master from_other %d master %d "
862 "dir %d first %x %s", from_nodeid,
863 r->res_master_nodeid, r->res_dir_nodeid,
864 r->res_first_lkid, r->res_name);
865 }
866 return -ENOTBLK;
867 } else {
868 /* our rsb is not master, but the dir nodeid has sent us a
869 request; this could happen with master 0 / res_nodeid -1 */
870
871 if (r->res_master_nodeid) {
872 log_error(ls, "validate master from_dir %d master %d "
873 "first %x %s",
874 from_nodeid, r->res_master_nodeid,
875 r->res_first_lkid, r->res_name);
876 }
877
878 r->res_master_nodeid = dlm_our_nodeid();
879 r->res_nodeid = 0;
880 return 0;
881 }
882 }
883
__dlm_master_lookup(struct dlm_ls * ls,struct dlm_rsb * r,int our_nodeid,int from_nodeid,bool toss_list,unsigned int flags,int * r_nodeid,int * result)884 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
885 int from_nodeid, bool toss_list, unsigned int flags,
886 int *r_nodeid, int *result)
887 {
888 int fix_master = (flags & DLM_LU_RECOVER_MASTER);
889 int from_master = (flags & DLM_LU_RECOVER_DIR);
890
891 if (r->res_dir_nodeid != our_nodeid) {
892 /* should not happen, but may as well fix it and carry on */
893 log_error(ls, "%s res_dir %d our %d %s", __func__,
894 r->res_dir_nodeid, our_nodeid, r->res_name);
895 r->res_dir_nodeid = our_nodeid;
896 }
897
898 if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
899 /* Recovery uses this function to set a new master when
900 * the previous master failed. Setting NEW_MASTER will
901 * force dlm_recover_masters to call recover_master on this
902 * rsb even though the res_nodeid is no longer removed.
903 */
904
905 r->res_master_nodeid = from_nodeid;
906 r->res_nodeid = from_nodeid;
907 rsb_set_flag(r, RSB_NEW_MASTER);
908
909 if (toss_list) {
910 /* I don't think we should ever find it on toss list. */
911 log_error(ls, "%s fix_master on toss", __func__);
912 dlm_dump_rsb(r);
913 }
914 }
915
916 if (from_master && (r->res_master_nodeid != from_nodeid)) {
917 /* this will happen if from_nodeid became master during
918 * a previous recovery cycle, and we aborted the previous
919 * cycle before recovering this master value
920 */
921
922 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
923 __func__, from_nodeid, r->res_master_nodeid,
924 r->res_nodeid, r->res_first_lkid, r->res_name);
925
926 if (r->res_master_nodeid == our_nodeid) {
927 log_error(ls, "from_master %d our_master", from_nodeid);
928 dlm_dump_rsb(r);
929 goto ret_assign;
930 }
931
932 r->res_master_nodeid = from_nodeid;
933 r->res_nodeid = from_nodeid;
934 rsb_set_flag(r, RSB_NEW_MASTER);
935 }
936
937 if (!r->res_master_nodeid) {
938 /* this will happen if recovery happens while we're looking
939 * up the master for this rsb
940 */
941
942 log_debug(ls, "%s master 0 to %d first %x %s", __func__,
943 from_nodeid, r->res_first_lkid, r->res_name);
944 r->res_master_nodeid = from_nodeid;
945 r->res_nodeid = from_nodeid;
946 }
947
948 if (!from_master && !fix_master &&
949 (r->res_master_nodeid == from_nodeid)) {
950 /* this can happen when the master sends remove, the dir node
951 * finds the rsb on the keep list and ignores the remove,
952 * and the former master sends a lookup
953 */
954
955 log_limit(ls, "%s from master %d flags %x first %x %s",
956 __func__, from_nodeid, flags, r->res_first_lkid,
957 r->res_name);
958 }
959
960 ret_assign:
961 *r_nodeid = r->res_master_nodeid;
962 if (result)
963 *result = DLM_LU_MATCH;
964 }
965
966 /*
967 * We're the dir node for this res and another node wants to know the
968 * master nodeid. During normal operation (non recovery) this is only
969 * called from receive_lookup(); master lookups when the local node is
970 * the dir node are done by find_rsb().
971 *
972 * normal operation, we are the dir node for a resource
973 * . _request_lock
974 * . set_master
975 * . send_lookup
976 * . receive_lookup
977 * . dlm_master_lookup flags 0
978 *
979 * recover directory, we are rebuilding dir for all resources
980 * . dlm_recover_directory
981 * . dlm_rcom_names
982 * remote node sends back the rsb names it is master of and we are dir of
983 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
984 * we either create new rsb setting remote node as master, or find existing
985 * rsb and set master to be the remote node.
986 *
987 * recover masters, we are finding the new master for resources
988 * . dlm_recover_masters
989 * . recover_master
990 * . dlm_send_rcom_lookup
991 * . receive_rcom_lookup
992 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
993 */
994
dlm_master_lookup(struct dlm_ls * ls,int from_nodeid,char * name,int len,unsigned int flags,int * r_nodeid,int * result)995 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
996 unsigned int flags, int *r_nodeid, int *result)
997 {
998 struct dlm_rsb *r = NULL;
999 uint32_t hash, b;
1000 int our_nodeid = dlm_our_nodeid();
1001 int dir_nodeid, error;
1002
1003 if (len > DLM_RESNAME_MAXLEN)
1004 return -EINVAL;
1005
1006 if (from_nodeid == our_nodeid) {
1007 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1008 our_nodeid, flags);
1009 return -EINVAL;
1010 }
1011
1012 hash = jhash(name, len, 0);
1013 b = hash & (ls->ls_rsbtbl_size - 1);
1014
1015 dir_nodeid = dlm_hash2nodeid(ls, hash);
1016 if (dir_nodeid != our_nodeid) {
1017 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1018 from_nodeid, dir_nodeid, our_nodeid, hash,
1019 ls->ls_num_nodes);
1020 *r_nodeid = -1;
1021 return -EINVAL;
1022 }
1023
1024 retry:
1025 error = pre_rsb_struct(ls);
1026 if (error < 0)
1027 return error;
1028
1029 spin_lock(&ls->ls_rsbtbl[b].lock);
1030 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1031 if (!error) {
1032 /* because the rsb is active, we need to lock_rsb before
1033 * checking/changing re_master_nodeid
1034 */
1035
1036 hold_rsb(r);
1037 spin_unlock(&ls->ls_rsbtbl[b].lock);
1038 lock_rsb(r);
1039
1040 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1041 flags, r_nodeid, result);
1042
1043 /* the rsb was active */
1044 unlock_rsb(r);
1045 put_rsb(r);
1046
1047 return 0;
1048 }
1049
1050 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1051 if (error)
1052 goto not_found;
1053
1054 /* because the rsb is inactive (on toss list), it's not refcounted
1055 * and lock_rsb is not used, but is protected by the rsbtbl lock
1056 */
1057
1058 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1059 r_nodeid, result);
1060
1061 r->res_toss_time = jiffies;
1062 /* the rsb was inactive (on toss list) */
1063 spin_unlock(&ls->ls_rsbtbl[b].lock);
1064
1065 return 0;
1066
1067 not_found:
1068 error = get_rsb_struct(ls, name, len, &r);
1069 if (error == -EAGAIN) {
1070 spin_unlock(&ls->ls_rsbtbl[b].lock);
1071 goto retry;
1072 }
1073 if (error)
1074 goto out_unlock;
1075
1076 r->res_hash = hash;
1077 r->res_bucket = b;
1078 r->res_dir_nodeid = our_nodeid;
1079 r->res_master_nodeid = from_nodeid;
1080 r->res_nodeid = from_nodeid;
1081 kref_init(&r->res_ref);
1082 r->res_toss_time = jiffies;
1083
1084 error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1085 if (error) {
1086 /* should never happen */
1087 dlm_free_rsb(r);
1088 spin_unlock(&ls->ls_rsbtbl[b].lock);
1089 goto retry;
1090 }
1091
1092 if (result)
1093 *result = DLM_LU_ADD;
1094 *r_nodeid = from_nodeid;
1095 out_unlock:
1096 spin_unlock(&ls->ls_rsbtbl[b].lock);
1097 return error;
1098 }
1099
dlm_dump_rsb_hash(struct dlm_ls * ls,uint32_t hash)1100 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1101 {
1102 struct rb_node *n;
1103 struct dlm_rsb *r;
1104 int i;
1105
1106 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1107 spin_lock(&ls->ls_rsbtbl[i].lock);
1108 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1109 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1110 if (r->res_hash == hash)
1111 dlm_dump_rsb(r);
1112 }
1113 spin_unlock(&ls->ls_rsbtbl[i].lock);
1114 }
1115 }
1116
dlm_dump_rsb_name(struct dlm_ls * ls,char * name,int len)1117 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1118 {
1119 struct dlm_rsb *r = NULL;
1120 uint32_t hash, b;
1121 int error;
1122
1123 hash = jhash(name, len, 0);
1124 b = hash & (ls->ls_rsbtbl_size - 1);
1125
1126 spin_lock(&ls->ls_rsbtbl[b].lock);
1127 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1128 if (!error)
1129 goto out_dump;
1130
1131 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1132 if (error)
1133 goto out;
1134 out_dump:
1135 dlm_dump_rsb(r);
1136 out:
1137 spin_unlock(&ls->ls_rsbtbl[b].lock);
1138 }
1139
toss_rsb(struct kref * kref)1140 static void toss_rsb(struct kref *kref)
1141 {
1142 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1143 struct dlm_ls *ls = r->res_ls;
1144
1145 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1146 kref_init(&r->res_ref);
1147 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1148 rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1149 r->res_toss_time = jiffies;
1150 ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK;
1151 if (r->res_lvbptr) {
1152 dlm_free_lvb(r->res_lvbptr);
1153 r->res_lvbptr = NULL;
1154 }
1155 }
1156
1157 /* See comment for unhold_lkb */
1158
unhold_rsb(struct dlm_rsb * r)1159 static void unhold_rsb(struct dlm_rsb *r)
1160 {
1161 int rv;
1162 rv = kref_put(&r->res_ref, toss_rsb);
1163 DLM_ASSERT(!rv, dlm_dump_rsb(r););
1164 }
1165
kill_rsb(struct kref * kref)1166 static void kill_rsb(struct kref *kref)
1167 {
1168 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1169
1170 /* All work is done after the return from kref_put() so we
1171 can release the write_lock before the remove and free. */
1172
1173 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1174 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1175 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1176 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1177 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1178 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1179 }
1180
1181 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1182 The rsb must exist as long as any lkb's for it do. */
1183
attach_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb)1184 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1185 {
1186 hold_rsb(r);
1187 lkb->lkb_resource = r;
1188 }
1189
detach_lkb(struct dlm_lkb * lkb)1190 static void detach_lkb(struct dlm_lkb *lkb)
1191 {
1192 if (lkb->lkb_resource) {
1193 put_rsb(lkb->lkb_resource);
1194 lkb->lkb_resource = NULL;
1195 }
1196 }
1197
_create_lkb(struct dlm_ls * ls,struct dlm_lkb ** lkb_ret,int start,int end)1198 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1199 int start, int end)
1200 {
1201 struct dlm_lkb *lkb;
1202 int rv;
1203
1204 lkb = dlm_allocate_lkb(ls);
1205 if (!lkb)
1206 return -ENOMEM;
1207
1208 lkb->lkb_nodeid = -1;
1209 lkb->lkb_grmode = DLM_LOCK_IV;
1210 kref_init(&lkb->lkb_ref);
1211 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1212 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1213 INIT_LIST_HEAD(&lkb->lkb_time_list);
1214 INIT_LIST_HEAD(&lkb->lkb_cb_list);
1215 mutex_init(&lkb->lkb_cb_mutex);
1216 INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1217
1218 idr_preload(GFP_NOFS);
1219 spin_lock(&ls->ls_lkbidr_spin);
1220 rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
1221 if (rv >= 0)
1222 lkb->lkb_id = rv;
1223 spin_unlock(&ls->ls_lkbidr_spin);
1224 idr_preload_end();
1225
1226 if (rv < 0) {
1227 log_error(ls, "create_lkb idr error %d", rv);
1228 dlm_free_lkb(lkb);
1229 return rv;
1230 }
1231
1232 *lkb_ret = lkb;
1233 return 0;
1234 }
1235
create_lkb(struct dlm_ls * ls,struct dlm_lkb ** lkb_ret)1236 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1237 {
1238 return _create_lkb(ls, lkb_ret, 1, 0);
1239 }
1240
find_lkb(struct dlm_ls * ls,uint32_t lkid,struct dlm_lkb ** lkb_ret)1241 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1242 {
1243 struct dlm_lkb *lkb;
1244
1245 spin_lock(&ls->ls_lkbidr_spin);
1246 lkb = idr_find(&ls->ls_lkbidr, lkid);
1247 if (lkb)
1248 kref_get(&lkb->lkb_ref);
1249 spin_unlock(&ls->ls_lkbidr_spin);
1250
1251 *lkb_ret = lkb;
1252 return lkb ? 0 : -ENOENT;
1253 }
1254
kill_lkb(struct kref * kref)1255 static void kill_lkb(struct kref *kref)
1256 {
1257 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1258
1259 /* All work is done after the return from kref_put() so we
1260 can release the write_lock before the detach_lkb */
1261
1262 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1263 }
1264
1265 /* __put_lkb() is used when an lkb may not have an rsb attached to
1266 it so we need to provide the lockspace explicitly */
1267
__put_lkb(struct dlm_ls * ls,struct dlm_lkb * lkb)1268 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1269 {
1270 uint32_t lkid = lkb->lkb_id;
1271 int rv;
1272
1273 rv = kref_put_lock(&lkb->lkb_ref, kill_lkb,
1274 &ls->ls_lkbidr_spin);
1275 if (rv) {
1276 idr_remove(&ls->ls_lkbidr, lkid);
1277 spin_unlock(&ls->ls_lkbidr_spin);
1278
1279 detach_lkb(lkb);
1280
1281 /* for local/process lkbs, lvbptr points to caller's lksb */
1282 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1283 dlm_free_lvb(lkb->lkb_lvbptr);
1284 dlm_free_lkb(lkb);
1285 }
1286
1287 return rv;
1288 }
1289
dlm_put_lkb(struct dlm_lkb * lkb)1290 int dlm_put_lkb(struct dlm_lkb *lkb)
1291 {
1292 struct dlm_ls *ls;
1293
1294 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1295 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1296
1297 ls = lkb->lkb_resource->res_ls;
1298 return __put_lkb(ls, lkb);
1299 }
1300
1301 /* This is only called to add a reference when the code already holds
1302 a valid reference to the lkb, so there's no need for locking. */
1303
hold_lkb(struct dlm_lkb * lkb)1304 static inline void hold_lkb(struct dlm_lkb *lkb)
1305 {
1306 kref_get(&lkb->lkb_ref);
1307 }
1308
1309 /* This is called when we need to remove a reference and are certain
1310 it's not the last ref. e.g. del_lkb is always called between a
1311 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1312 put_lkb would work fine, but would involve unnecessary locking */
1313
unhold_lkb(struct dlm_lkb * lkb)1314 static inline void unhold_lkb(struct dlm_lkb *lkb)
1315 {
1316 int rv;
1317 rv = kref_put(&lkb->lkb_ref, kill_lkb);
1318 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
1319 }
1320
lkb_add_ordered(struct list_head * new,struct list_head * head,int mode)1321 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1322 int mode)
1323 {
1324 struct dlm_lkb *lkb = NULL, *iter;
1325
1326 list_for_each_entry(iter, head, lkb_statequeue)
1327 if (iter->lkb_rqmode < mode) {
1328 lkb = iter;
1329 list_add_tail(new, &iter->lkb_statequeue);
1330 break;
1331 }
1332
1333 if (!lkb)
1334 list_add_tail(new, head);
1335 }
1336
1337 /* add/remove lkb to rsb's grant/convert/wait queue */
1338
add_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb,int status)1339 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1340 {
1341 kref_get(&lkb->lkb_ref);
1342
1343 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1344
1345 lkb->lkb_timestamp = ktime_get();
1346
1347 lkb->lkb_status = status;
1348
1349 switch (status) {
1350 case DLM_LKSTS_WAITING:
1351 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1352 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1353 else
1354 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1355 break;
1356 case DLM_LKSTS_GRANTED:
1357 /* convention says granted locks kept in order of grmode */
1358 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1359 lkb->lkb_grmode);
1360 break;
1361 case DLM_LKSTS_CONVERT:
1362 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1363 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1364 else
1365 list_add_tail(&lkb->lkb_statequeue,
1366 &r->res_convertqueue);
1367 break;
1368 default:
1369 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1370 }
1371 }
1372
del_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb)1373 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1374 {
1375 lkb->lkb_status = 0;
1376 list_del(&lkb->lkb_statequeue);
1377 unhold_lkb(lkb);
1378 }
1379
move_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb,int sts)1380 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1381 {
1382 hold_lkb(lkb);
1383 del_lkb(r, lkb);
1384 add_lkb(r, lkb, sts);
1385 unhold_lkb(lkb);
1386 }
1387
msg_reply_type(int mstype)1388 static int msg_reply_type(int mstype)
1389 {
1390 switch (mstype) {
1391 case DLM_MSG_REQUEST:
1392 return DLM_MSG_REQUEST_REPLY;
1393 case DLM_MSG_CONVERT:
1394 return DLM_MSG_CONVERT_REPLY;
1395 case DLM_MSG_UNLOCK:
1396 return DLM_MSG_UNLOCK_REPLY;
1397 case DLM_MSG_CANCEL:
1398 return DLM_MSG_CANCEL_REPLY;
1399 case DLM_MSG_LOOKUP:
1400 return DLM_MSG_LOOKUP_REPLY;
1401 }
1402 return -1;
1403 }
1404
nodeid_warned(int nodeid,int num_nodes,int * warned)1405 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
1406 {
1407 int i;
1408
1409 for (i = 0; i < num_nodes; i++) {
1410 if (!warned[i]) {
1411 warned[i] = nodeid;
1412 return 0;
1413 }
1414 if (warned[i] == nodeid)
1415 return 1;
1416 }
1417 return 0;
1418 }
1419
dlm_scan_waiters(struct dlm_ls * ls)1420 void dlm_scan_waiters(struct dlm_ls *ls)
1421 {
1422 struct dlm_lkb *lkb;
1423 s64 us;
1424 s64 debug_maxus = 0;
1425 u32 debug_scanned = 0;
1426 u32 debug_expired = 0;
1427 int num_nodes = 0;
1428 int *warned = NULL;
1429
1430 if (!dlm_config.ci_waitwarn_us)
1431 return;
1432
1433 mutex_lock(&ls->ls_waiters_mutex);
1434
1435 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1436 if (!lkb->lkb_wait_time)
1437 continue;
1438
1439 debug_scanned++;
1440
1441 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
1442
1443 if (us < dlm_config.ci_waitwarn_us)
1444 continue;
1445
1446 lkb->lkb_wait_time = 0;
1447
1448 debug_expired++;
1449 if (us > debug_maxus)
1450 debug_maxus = us;
1451
1452 if (!num_nodes) {
1453 num_nodes = ls->ls_num_nodes;
1454 warned = kcalloc(num_nodes, sizeof(int), GFP_KERNEL);
1455 }
1456 if (!warned)
1457 continue;
1458 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
1459 continue;
1460
1461 log_error(ls, "waitwarn %x %lld %d us check connection to "
1462 "node %d", lkb->lkb_id, (long long)us,
1463 dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
1464 }
1465 mutex_unlock(&ls->ls_waiters_mutex);
1466 kfree(warned);
1467
1468 if (debug_expired)
1469 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
1470 debug_scanned, debug_expired,
1471 dlm_config.ci_waitwarn_us, (long long)debug_maxus);
1472 }
1473
1474 /* add/remove lkb from global waiters list of lkb's waiting for
1475 a reply from a remote node */
1476
add_to_waiters(struct dlm_lkb * lkb,int mstype,int to_nodeid)1477 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1478 {
1479 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1480 int error = 0;
1481
1482 mutex_lock(&ls->ls_waiters_mutex);
1483
1484 if (is_overlap_unlock(lkb) ||
1485 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1486 error = -EINVAL;
1487 goto out;
1488 }
1489
1490 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1491 switch (mstype) {
1492 case DLM_MSG_UNLOCK:
1493 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1494 break;
1495 case DLM_MSG_CANCEL:
1496 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1497 break;
1498 default:
1499 error = -EBUSY;
1500 goto out;
1501 }
1502 lkb->lkb_wait_count++;
1503 hold_lkb(lkb);
1504
1505 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1506 lkb->lkb_id, lkb->lkb_wait_type, mstype,
1507 lkb->lkb_wait_count, lkb->lkb_flags);
1508 goto out;
1509 }
1510
1511 DLM_ASSERT(!lkb->lkb_wait_count,
1512 dlm_print_lkb(lkb);
1513 printk("wait_count %d\n", lkb->lkb_wait_count););
1514
1515 lkb->lkb_wait_count++;
1516 lkb->lkb_wait_type = mstype;
1517 lkb->lkb_wait_time = ktime_get();
1518 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1519 hold_lkb(lkb);
1520 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1521 out:
1522 if (error)
1523 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1524 lkb->lkb_id, error, lkb->lkb_flags, mstype,
1525 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1526 mutex_unlock(&ls->ls_waiters_mutex);
1527 return error;
1528 }
1529
1530 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1531 list as part of process_requestqueue (e.g. a lookup that has an optimized
1532 request reply on the requestqueue) between dlm_recover_waiters_pre() which
1533 set RESEND and dlm_recover_waiters_post() */
1534
_remove_from_waiters(struct dlm_lkb * lkb,int mstype,struct dlm_message * ms)1535 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1536 struct dlm_message *ms)
1537 {
1538 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1539 int overlap_done = 0;
1540
1541 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1542 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1543 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1544 overlap_done = 1;
1545 goto out_del;
1546 }
1547
1548 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1549 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1550 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1551 overlap_done = 1;
1552 goto out_del;
1553 }
1554
1555 /* Cancel state was preemptively cleared by a successful convert,
1556 see next comment, nothing to do. */
1557
1558 if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1559 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1560 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1561 lkb->lkb_id, lkb->lkb_wait_type);
1562 return -1;
1563 }
1564
1565 /* Remove for the convert reply, and premptively remove for the
1566 cancel reply. A convert has been granted while there's still
1567 an outstanding cancel on it (the cancel is moot and the result
1568 in the cancel reply should be 0). We preempt the cancel reply
1569 because the app gets the convert result and then can follow up
1570 with another op, like convert. This subsequent op would see the
1571 lingering state of the cancel and fail with -EBUSY. */
1572
1573 if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1574 (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1575 is_overlap_cancel(lkb) && ms && !ms->m_result) {
1576 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1577 lkb->lkb_id);
1578 lkb->lkb_wait_type = 0;
1579 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1580 lkb->lkb_wait_count--;
1581 unhold_lkb(lkb);
1582 goto out_del;
1583 }
1584
1585 /* N.B. type of reply may not always correspond to type of original
1586 msg due to lookup->request optimization, verify others? */
1587
1588 if (lkb->lkb_wait_type) {
1589 lkb->lkb_wait_type = 0;
1590 goto out_del;
1591 }
1592
1593 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1594 lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1595 lkb->lkb_remid, mstype, lkb->lkb_flags);
1596 return -1;
1597
1598 out_del:
1599 /* the force-unlock/cancel has completed and we haven't recvd a reply
1600 to the op that was in progress prior to the unlock/cancel; we
1601 give up on any reply to the earlier op. FIXME: not sure when/how
1602 this would happen */
1603
1604 if (overlap_done && lkb->lkb_wait_type) {
1605 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1606 lkb->lkb_id, mstype, lkb->lkb_wait_type);
1607 lkb->lkb_wait_count--;
1608 unhold_lkb(lkb);
1609 lkb->lkb_wait_type = 0;
1610 }
1611
1612 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1613
1614 lkb->lkb_flags &= ~DLM_IFL_RESEND;
1615 lkb->lkb_wait_count--;
1616 if (!lkb->lkb_wait_count)
1617 list_del_init(&lkb->lkb_wait_reply);
1618 unhold_lkb(lkb);
1619 return 0;
1620 }
1621
remove_from_waiters(struct dlm_lkb * lkb,int mstype)1622 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1623 {
1624 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1625 int error;
1626
1627 mutex_lock(&ls->ls_waiters_mutex);
1628 error = _remove_from_waiters(lkb, mstype, NULL);
1629 mutex_unlock(&ls->ls_waiters_mutex);
1630 return error;
1631 }
1632
1633 /* Handles situations where we might be processing a "fake" or "stub" reply in
1634 which we can't try to take waiters_mutex again. */
1635
remove_from_waiters_ms(struct dlm_lkb * lkb,struct dlm_message * ms)1636 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1637 {
1638 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1639 int error;
1640
1641 if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS))
1642 mutex_lock(&ls->ls_waiters_mutex);
1643 error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1644 if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS))
1645 mutex_unlock(&ls->ls_waiters_mutex);
1646 return error;
1647 }
1648
1649 /* If there's an rsb for the same resource being removed, ensure
1650 * that the remove message is sent before the new lookup message.
1651 */
1652
1653 #define DLM_WAIT_PENDING_COND(ls, r) \
1654 (ls->ls_remove_len && \
1655 !rsb_cmp(r, ls->ls_remove_name, \
1656 ls->ls_remove_len))
1657
wait_pending_remove(struct dlm_rsb * r)1658 static void wait_pending_remove(struct dlm_rsb *r)
1659 {
1660 struct dlm_ls *ls = r->res_ls;
1661 restart:
1662 spin_lock(&ls->ls_remove_spin);
1663 if (DLM_WAIT_PENDING_COND(ls, r)) {
1664 log_debug(ls, "delay lookup for remove dir %d %s",
1665 r->res_dir_nodeid, r->res_name);
1666 spin_unlock(&ls->ls_remove_spin);
1667 wait_event(ls->ls_remove_wait, !DLM_WAIT_PENDING_COND(ls, r));
1668 goto restart;
1669 }
1670 spin_unlock(&ls->ls_remove_spin);
1671 }
1672
1673 /*
1674 * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1675 * read by other threads in wait_pending_remove. ls_remove_names
1676 * and ls_remove_lens are only used by the scan thread, so they do
1677 * not need protection.
1678 */
1679
shrink_bucket(struct dlm_ls * ls,int b)1680 static void shrink_bucket(struct dlm_ls *ls, int b)
1681 {
1682 struct rb_node *n, *next;
1683 struct dlm_rsb *r;
1684 char *name;
1685 int our_nodeid = dlm_our_nodeid();
1686 int remote_count = 0;
1687 int need_shrink = 0;
1688 int i, len, rv;
1689
1690 memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1691
1692 spin_lock(&ls->ls_rsbtbl[b].lock);
1693
1694 if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) {
1695 spin_unlock(&ls->ls_rsbtbl[b].lock);
1696 return;
1697 }
1698
1699 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1700 next = rb_next(n);
1701 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1702
1703 /* If we're the directory record for this rsb, and
1704 we're not the master of it, then we need to wait
1705 for the master node to send us a dir remove for
1706 before removing the dir record. */
1707
1708 if (!dlm_no_directory(ls) &&
1709 (r->res_master_nodeid != our_nodeid) &&
1710 (dlm_dir_nodeid(r) == our_nodeid)) {
1711 continue;
1712 }
1713
1714 need_shrink = 1;
1715
1716 if (!time_after_eq(jiffies, r->res_toss_time +
1717 dlm_config.ci_toss_secs * HZ)) {
1718 continue;
1719 }
1720
1721 if (!dlm_no_directory(ls) &&
1722 (r->res_master_nodeid == our_nodeid) &&
1723 (dlm_dir_nodeid(r) != our_nodeid)) {
1724
1725 /* We're the master of this rsb but we're not
1726 the directory record, so we need to tell the
1727 dir node to remove the dir record. */
1728
1729 ls->ls_remove_lens[remote_count] = r->res_length;
1730 memcpy(ls->ls_remove_names[remote_count], r->res_name,
1731 DLM_RESNAME_MAXLEN);
1732 remote_count++;
1733
1734 if (remote_count >= DLM_REMOVE_NAMES_MAX)
1735 break;
1736 continue;
1737 }
1738
1739 if (!kref_put(&r->res_ref, kill_rsb)) {
1740 log_error(ls, "tossed rsb in use %s", r->res_name);
1741 continue;
1742 }
1743
1744 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1745 dlm_free_rsb(r);
1746 }
1747
1748 if (need_shrink)
1749 ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK;
1750 else
1751 ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK;
1752 spin_unlock(&ls->ls_rsbtbl[b].lock);
1753
1754 /*
1755 * While searching for rsb's to free, we found some that require
1756 * remote removal. We leave them in place and find them again here
1757 * so there is a very small gap between removing them from the toss
1758 * list and sending the removal. Keeping this gap small is
1759 * important to keep us (the master node) from being out of sync
1760 * with the remote dir node for very long.
1761 *
1762 * From the time the rsb is removed from toss until just after
1763 * send_remove, the rsb name is saved in ls_remove_name. A new
1764 * lookup checks this to ensure that a new lookup message for the
1765 * same resource name is not sent just before the remove message.
1766 */
1767
1768 for (i = 0; i < remote_count; i++) {
1769 name = ls->ls_remove_names[i];
1770 len = ls->ls_remove_lens[i];
1771
1772 spin_lock(&ls->ls_rsbtbl[b].lock);
1773 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1774 if (rv) {
1775 spin_unlock(&ls->ls_rsbtbl[b].lock);
1776 log_debug(ls, "remove_name not toss %s", name);
1777 continue;
1778 }
1779
1780 if (r->res_master_nodeid != our_nodeid) {
1781 spin_unlock(&ls->ls_rsbtbl[b].lock);
1782 log_debug(ls, "remove_name master %d dir %d our %d %s",
1783 r->res_master_nodeid, r->res_dir_nodeid,
1784 our_nodeid, name);
1785 continue;
1786 }
1787
1788 if (r->res_dir_nodeid == our_nodeid) {
1789 /* should never happen */
1790 spin_unlock(&ls->ls_rsbtbl[b].lock);
1791 log_error(ls, "remove_name dir %d master %d our %d %s",
1792 r->res_dir_nodeid, r->res_master_nodeid,
1793 our_nodeid, name);
1794 continue;
1795 }
1796
1797 if (!time_after_eq(jiffies, r->res_toss_time +
1798 dlm_config.ci_toss_secs * HZ)) {
1799 spin_unlock(&ls->ls_rsbtbl[b].lock);
1800 log_debug(ls, "remove_name toss_time %lu now %lu %s",
1801 r->res_toss_time, jiffies, name);
1802 continue;
1803 }
1804
1805 if (!kref_put(&r->res_ref, kill_rsb)) {
1806 spin_unlock(&ls->ls_rsbtbl[b].lock);
1807 log_error(ls, "remove_name in use %s", name);
1808 continue;
1809 }
1810
1811 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1812
1813 /* block lookup of same name until we've sent remove */
1814 spin_lock(&ls->ls_remove_spin);
1815 ls->ls_remove_len = len;
1816 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1817 spin_unlock(&ls->ls_remove_spin);
1818 spin_unlock(&ls->ls_rsbtbl[b].lock);
1819
1820 send_remove(r);
1821
1822 /* allow lookup of name again */
1823 spin_lock(&ls->ls_remove_spin);
1824 ls->ls_remove_len = 0;
1825 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1826 spin_unlock(&ls->ls_remove_spin);
1827 wake_up(&ls->ls_remove_wait);
1828
1829 dlm_free_rsb(r);
1830 }
1831 }
1832
dlm_scan_rsbs(struct dlm_ls * ls)1833 void dlm_scan_rsbs(struct dlm_ls *ls)
1834 {
1835 int i;
1836
1837 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1838 shrink_bucket(ls, i);
1839 if (dlm_locking_stopped(ls))
1840 break;
1841 cond_resched();
1842 }
1843 }
1844
add_timeout(struct dlm_lkb * lkb)1845 static void add_timeout(struct dlm_lkb *lkb)
1846 {
1847 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1848
1849 if (is_master_copy(lkb))
1850 return;
1851
1852 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1853 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1854 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1855 goto add_it;
1856 }
1857 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1858 goto add_it;
1859 return;
1860
1861 add_it:
1862 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1863 mutex_lock(&ls->ls_timeout_mutex);
1864 hold_lkb(lkb);
1865 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1866 mutex_unlock(&ls->ls_timeout_mutex);
1867 }
1868
del_timeout(struct dlm_lkb * lkb)1869 static void del_timeout(struct dlm_lkb *lkb)
1870 {
1871 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1872
1873 mutex_lock(&ls->ls_timeout_mutex);
1874 if (!list_empty(&lkb->lkb_time_list)) {
1875 list_del_init(&lkb->lkb_time_list);
1876 unhold_lkb(lkb);
1877 }
1878 mutex_unlock(&ls->ls_timeout_mutex);
1879 }
1880
1881 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1882 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1883 and then lock rsb because of lock ordering in add_timeout. We may need
1884 to specify some special timeout-related bits in the lkb that are just to
1885 be accessed under the timeout_mutex. */
1886
dlm_scan_timeout(struct dlm_ls * ls)1887 void dlm_scan_timeout(struct dlm_ls *ls)
1888 {
1889 struct dlm_rsb *r;
1890 struct dlm_lkb *lkb = NULL, *iter;
1891 int do_cancel, do_warn;
1892 s64 wait_us;
1893
1894 for (;;) {
1895 if (dlm_locking_stopped(ls))
1896 break;
1897
1898 do_cancel = 0;
1899 do_warn = 0;
1900 mutex_lock(&ls->ls_timeout_mutex);
1901 list_for_each_entry(iter, &ls->ls_timeout, lkb_time_list) {
1902
1903 wait_us = ktime_to_us(ktime_sub(ktime_get(),
1904 iter->lkb_timestamp));
1905
1906 if ((iter->lkb_exflags & DLM_LKF_TIMEOUT) &&
1907 wait_us >= (iter->lkb_timeout_cs * 10000))
1908 do_cancel = 1;
1909
1910 if ((iter->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1911 wait_us >= dlm_config.ci_timewarn_cs * 10000)
1912 do_warn = 1;
1913
1914 if (!do_cancel && !do_warn)
1915 continue;
1916 hold_lkb(iter);
1917 lkb = iter;
1918 break;
1919 }
1920 mutex_unlock(&ls->ls_timeout_mutex);
1921
1922 if (!lkb)
1923 break;
1924
1925 r = lkb->lkb_resource;
1926 hold_rsb(r);
1927 lock_rsb(r);
1928
1929 if (do_warn) {
1930 /* clear flag so we only warn once */
1931 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1932 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1933 del_timeout(lkb);
1934 dlm_timeout_warn(lkb);
1935 }
1936
1937 if (do_cancel) {
1938 log_debug(ls, "timeout cancel %x node %d %s",
1939 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1940 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1941 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1942 del_timeout(lkb);
1943 _cancel_lock(r, lkb);
1944 }
1945
1946 unlock_rsb(r);
1947 unhold_rsb(r);
1948 dlm_put_lkb(lkb);
1949 }
1950 }
1951
1952 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1953 dlm_recoverd before checking/setting ls_recover_begin. */
1954
dlm_adjust_timeouts(struct dlm_ls * ls)1955 void dlm_adjust_timeouts(struct dlm_ls *ls)
1956 {
1957 struct dlm_lkb *lkb;
1958 u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1959
1960 ls->ls_recover_begin = 0;
1961 mutex_lock(&ls->ls_timeout_mutex);
1962 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1963 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1964 mutex_unlock(&ls->ls_timeout_mutex);
1965
1966 if (!dlm_config.ci_waitwarn_us)
1967 return;
1968
1969 mutex_lock(&ls->ls_waiters_mutex);
1970 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1971 if (ktime_to_us(lkb->lkb_wait_time))
1972 lkb->lkb_wait_time = ktime_get();
1973 }
1974 mutex_unlock(&ls->ls_waiters_mutex);
1975 }
1976
1977 /* lkb is master or local copy */
1978
set_lvb_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)1979 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1980 {
1981 int b, len = r->res_ls->ls_lvblen;
1982
1983 /* b=1 lvb returned to caller
1984 b=0 lvb written to rsb or invalidated
1985 b=-1 do nothing */
1986
1987 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1988
1989 if (b == 1) {
1990 if (!lkb->lkb_lvbptr)
1991 return;
1992
1993 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1994 return;
1995
1996 if (!r->res_lvbptr)
1997 return;
1998
1999 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
2000 lkb->lkb_lvbseq = r->res_lvbseq;
2001
2002 } else if (b == 0) {
2003 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
2004 rsb_set_flag(r, RSB_VALNOTVALID);
2005 return;
2006 }
2007
2008 if (!lkb->lkb_lvbptr)
2009 return;
2010
2011 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2012 return;
2013
2014 if (!r->res_lvbptr)
2015 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
2016
2017 if (!r->res_lvbptr)
2018 return;
2019
2020 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
2021 r->res_lvbseq++;
2022 lkb->lkb_lvbseq = r->res_lvbseq;
2023 rsb_clear_flag(r, RSB_VALNOTVALID);
2024 }
2025
2026 if (rsb_flag(r, RSB_VALNOTVALID))
2027 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
2028 }
2029
set_lvb_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)2030 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2031 {
2032 if (lkb->lkb_grmode < DLM_LOCK_PW)
2033 return;
2034
2035 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
2036 rsb_set_flag(r, RSB_VALNOTVALID);
2037 return;
2038 }
2039
2040 if (!lkb->lkb_lvbptr)
2041 return;
2042
2043 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2044 return;
2045
2046 if (!r->res_lvbptr)
2047 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
2048
2049 if (!r->res_lvbptr)
2050 return;
2051
2052 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2053 r->res_lvbseq++;
2054 rsb_clear_flag(r, RSB_VALNOTVALID);
2055 }
2056
2057 /* lkb is process copy (pc) */
2058
set_lvb_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb,struct dlm_message * ms)2059 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2060 struct dlm_message *ms)
2061 {
2062 int b;
2063
2064 if (!lkb->lkb_lvbptr)
2065 return;
2066
2067 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2068 return;
2069
2070 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
2071 if (b == 1) {
2072 int len = receive_extralen(ms);
2073 if (len > r->res_ls->ls_lvblen)
2074 len = r->res_ls->ls_lvblen;
2075 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2076 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
2077 }
2078 }
2079
2080 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2081 remove_lock -- used for unlock, removes lkb from granted
2082 revert_lock -- used for cancel, moves lkb from convert to granted
2083 grant_lock -- used for request and convert, adds lkb to granted or
2084 moves lkb from convert or waiting to granted
2085
2086 Each of these is used for master or local copy lkb's. There is
2087 also a _pc() variation used to make the corresponding change on
2088 a process copy (pc) lkb. */
2089
_remove_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2090 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2091 {
2092 del_lkb(r, lkb);
2093 lkb->lkb_grmode = DLM_LOCK_IV;
2094 /* this unhold undoes the original ref from create_lkb()
2095 so this leads to the lkb being freed */
2096 unhold_lkb(lkb);
2097 }
2098
remove_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2099 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2100 {
2101 set_lvb_unlock(r, lkb);
2102 _remove_lock(r, lkb);
2103 }
2104
remove_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb)2105 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2106 {
2107 _remove_lock(r, lkb);
2108 }
2109
2110 /* returns: 0 did nothing
2111 1 moved lock to granted
2112 -1 removed lock */
2113
revert_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2114 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2115 {
2116 int rv = 0;
2117
2118 lkb->lkb_rqmode = DLM_LOCK_IV;
2119
2120 switch (lkb->lkb_status) {
2121 case DLM_LKSTS_GRANTED:
2122 break;
2123 case DLM_LKSTS_CONVERT:
2124 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2125 rv = 1;
2126 break;
2127 case DLM_LKSTS_WAITING:
2128 del_lkb(r, lkb);
2129 lkb->lkb_grmode = DLM_LOCK_IV;
2130 /* this unhold undoes the original ref from create_lkb()
2131 so this leads to the lkb being freed */
2132 unhold_lkb(lkb);
2133 rv = -1;
2134 break;
2135 default:
2136 log_print("invalid status for revert %d", lkb->lkb_status);
2137 }
2138 return rv;
2139 }
2140
revert_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb)2141 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2142 {
2143 return revert_lock(r, lkb);
2144 }
2145
_grant_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2146 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2147 {
2148 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2149 lkb->lkb_grmode = lkb->lkb_rqmode;
2150 if (lkb->lkb_status)
2151 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2152 else
2153 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2154 }
2155
2156 lkb->lkb_rqmode = DLM_LOCK_IV;
2157 lkb->lkb_highbast = 0;
2158 }
2159
grant_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2160 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2161 {
2162 set_lvb_lock(r, lkb);
2163 _grant_lock(r, lkb);
2164 }
2165
grant_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb,struct dlm_message * ms)2166 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2167 struct dlm_message *ms)
2168 {
2169 set_lvb_lock_pc(r, lkb, ms);
2170 _grant_lock(r, lkb);
2171 }
2172
2173 /* called by grant_pending_locks() which means an async grant message must
2174 be sent to the requesting node in addition to granting the lock if the
2175 lkb belongs to a remote node. */
2176
grant_lock_pending(struct dlm_rsb * r,struct dlm_lkb * lkb)2177 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2178 {
2179 grant_lock(r, lkb);
2180 if (is_master_copy(lkb))
2181 send_grant(r, lkb);
2182 else
2183 queue_cast(r, lkb, 0);
2184 }
2185
2186 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2187 change the granted/requested modes. We're munging things accordingly in
2188 the process copy.
2189 CONVDEADLK: our grmode may have been forced down to NL to resolve a
2190 conversion deadlock
2191 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2192 compatible with other granted locks */
2193
munge_demoted(struct dlm_lkb * lkb)2194 static void munge_demoted(struct dlm_lkb *lkb)
2195 {
2196 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2197 log_print("munge_demoted %x invalid modes gr %d rq %d",
2198 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2199 return;
2200 }
2201
2202 lkb->lkb_grmode = DLM_LOCK_NL;
2203 }
2204
munge_altmode(struct dlm_lkb * lkb,struct dlm_message * ms)2205 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2206 {
2207 if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2208 ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2209 log_print("munge_altmode %x invalid reply type %d",
2210 lkb->lkb_id, le32_to_cpu(ms->m_type));
2211 return;
2212 }
2213
2214 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2215 lkb->lkb_rqmode = DLM_LOCK_PR;
2216 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2217 lkb->lkb_rqmode = DLM_LOCK_CW;
2218 else {
2219 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2220 dlm_print_lkb(lkb);
2221 }
2222 }
2223
first_in_list(struct dlm_lkb * lkb,struct list_head * head)2224 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2225 {
2226 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2227 lkb_statequeue);
2228 if (lkb->lkb_id == first->lkb_id)
2229 return 1;
2230
2231 return 0;
2232 }
2233
2234 /* Check if the given lkb conflicts with another lkb on the queue. */
2235
queue_conflict(struct list_head * head,struct dlm_lkb * lkb)2236 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2237 {
2238 struct dlm_lkb *this;
2239
2240 list_for_each_entry(this, head, lkb_statequeue) {
2241 if (this == lkb)
2242 continue;
2243 if (!modes_compat(this, lkb))
2244 return 1;
2245 }
2246 return 0;
2247 }
2248
2249 /*
2250 * "A conversion deadlock arises with a pair of lock requests in the converting
2251 * queue for one resource. The granted mode of each lock blocks the requested
2252 * mode of the other lock."
2253 *
2254 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2255 * convert queue from being granted, then deadlk/demote lkb.
2256 *
2257 * Example:
2258 * Granted Queue: empty
2259 * Convert Queue: NL->EX (first lock)
2260 * PR->EX (second lock)
2261 *
2262 * The first lock can't be granted because of the granted mode of the second
2263 * lock and the second lock can't be granted because it's not first in the
2264 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2265 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2266 * flag set and return DEMOTED in the lksb flags.
2267 *
2268 * Originally, this function detected conv-deadlk in a more limited scope:
2269 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2270 * - if lkb1 was the first entry in the queue (not just earlier), and was
2271 * blocked by the granted mode of lkb2, and there was nothing on the
2272 * granted queue preventing lkb1 from being granted immediately, i.e.
2273 * lkb2 was the only thing preventing lkb1 from being granted.
2274 *
2275 * That second condition meant we'd only say there was conv-deadlk if
2276 * resolving it (by demotion) would lead to the first lock on the convert
2277 * queue being granted right away. It allowed conversion deadlocks to exist
2278 * between locks on the convert queue while they couldn't be granted anyway.
2279 *
2280 * Now, we detect and take action on conversion deadlocks immediately when
2281 * they're created, even if they may not be immediately consequential. If
2282 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2283 * mode that would prevent lkb1's conversion from being granted, we do a
2284 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2285 * I think this means that the lkb_is_ahead condition below should always
2286 * be zero, i.e. there will never be conv-deadlk between two locks that are
2287 * both already on the convert queue.
2288 */
2289
conversion_deadlock_detect(struct dlm_rsb * r,struct dlm_lkb * lkb2)2290 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2291 {
2292 struct dlm_lkb *lkb1;
2293 int lkb_is_ahead = 0;
2294
2295 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2296 if (lkb1 == lkb2) {
2297 lkb_is_ahead = 1;
2298 continue;
2299 }
2300
2301 if (!lkb_is_ahead) {
2302 if (!modes_compat(lkb2, lkb1))
2303 return 1;
2304 } else {
2305 if (!modes_compat(lkb2, lkb1) &&
2306 !modes_compat(lkb1, lkb2))
2307 return 1;
2308 }
2309 }
2310 return 0;
2311 }
2312
2313 /*
2314 * Return 1 if the lock can be granted, 0 otherwise.
2315 * Also detect and resolve conversion deadlocks.
2316 *
2317 * lkb is the lock to be granted
2318 *
2319 * now is 1 if the function is being called in the context of the
2320 * immediate request, it is 0 if called later, after the lock has been
2321 * queued.
2322 *
2323 * recover is 1 if dlm_recover_grant() is trying to grant conversions
2324 * after recovery.
2325 *
2326 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2327 */
2328
_can_be_granted(struct dlm_rsb * r,struct dlm_lkb * lkb,int now,int recover)2329 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2330 int recover)
2331 {
2332 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2333
2334 /*
2335 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2336 * a new request for a NL mode lock being blocked.
2337 *
2338 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2339 * request, then it would be granted. In essence, the use of this flag
2340 * tells the Lock Manager to expedite theis request by not considering
2341 * what may be in the CONVERTING or WAITING queues... As of this
2342 * writing, the EXPEDITE flag can be used only with new requests for NL
2343 * mode locks. This flag is not valid for conversion requests.
2344 *
2345 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
2346 * conversion or used with a non-NL requested mode. We also know an
2347 * EXPEDITE request is always granted immediately, so now must always
2348 * be 1. The full condition to grant an expedite request: (now &&
2349 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2350 * therefore be shortened to just checking the flag.
2351 */
2352
2353 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2354 return 1;
2355
2356 /*
2357 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2358 * added to the remaining conditions.
2359 */
2360
2361 if (queue_conflict(&r->res_grantqueue, lkb))
2362 return 0;
2363
2364 /*
2365 * 6-3: By default, a conversion request is immediately granted if the
2366 * requested mode is compatible with the modes of all other granted
2367 * locks
2368 */
2369
2370 if (queue_conflict(&r->res_convertqueue, lkb))
2371 return 0;
2372
2373 /*
2374 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2375 * locks for a recovered rsb, on which lkb's have been rebuilt.
2376 * The lkb's may have been rebuilt on the queues in a different
2377 * order than they were in on the previous master. So, granting
2378 * queued conversions in order after recovery doesn't make sense
2379 * since the order hasn't been preserved anyway. The new order
2380 * could also have created a new "in place" conversion deadlock.
2381 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2382 * After recovery, there would be no granted locks, and possibly
2383 * NL->EX, PR->EX, an in-place conversion deadlock.) So, after
2384 * recovery, grant conversions without considering order.
2385 */
2386
2387 if (conv && recover)
2388 return 1;
2389
2390 /*
2391 * 6-5: But the default algorithm for deciding whether to grant or
2392 * queue conversion requests does not by itself guarantee that such
2393 * requests are serviced on a "first come first serve" basis. This, in
2394 * turn, can lead to a phenomenon known as "indefinate postponement".
2395 *
2396 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2397 * the system service employed to request a lock conversion. This flag
2398 * forces certain conversion requests to be queued, even if they are
2399 * compatible with the granted modes of other locks on the same
2400 * resource. Thus, the use of this flag results in conversion requests
2401 * being ordered on a "first come first servce" basis.
2402 *
2403 * DCT: This condition is all about new conversions being able to occur
2404 * "in place" while the lock remains on the granted queue (assuming
2405 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
2406 * doesn't _have_ to go onto the convert queue where it's processed in
2407 * order. The "now" variable is necessary to distinguish converts
2408 * being received and processed for the first time now, because once a
2409 * convert is moved to the conversion queue the condition below applies
2410 * requiring fifo granting.
2411 */
2412
2413 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2414 return 1;
2415
2416 /*
2417 * Even if the convert is compat with all granted locks,
2418 * QUECVT forces it behind other locks on the convert queue.
2419 */
2420
2421 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2422 if (list_empty(&r->res_convertqueue))
2423 return 1;
2424 else
2425 return 0;
2426 }
2427
2428 /*
2429 * The NOORDER flag is set to avoid the standard vms rules on grant
2430 * order.
2431 */
2432
2433 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2434 return 1;
2435
2436 /*
2437 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2438 * granted until all other conversion requests ahead of it are granted
2439 * and/or canceled.
2440 */
2441
2442 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2443 return 1;
2444
2445 /*
2446 * 6-4: By default, a new request is immediately granted only if all
2447 * three of the following conditions are satisfied when the request is
2448 * issued:
2449 * - The queue of ungranted conversion requests for the resource is
2450 * empty.
2451 * - The queue of ungranted new requests for the resource is empty.
2452 * - The mode of the new request is compatible with the most
2453 * restrictive mode of all granted locks on the resource.
2454 */
2455
2456 if (now && !conv && list_empty(&r->res_convertqueue) &&
2457 list_empty(&r->res_waitqueue))
2458 return 1;
2459
2460 /*
2461 * 6-4: Once a lock request is in the queue of ungranted new requests,
2462 * it cannot be granted until the queue of ungranted conversion
2463 * requests is empty, all ungranted new requests ahead of it are
2464 * granted and/or canceled, and it is compatible with the granted mode
2465 * of the most restrictive lock granted on the resource.
2466 */
2467
2468 if (!now && !conv && list_empty(&r->res_convertqueue) &&
2469 first_in_list(lkb, &r->res_waitqueue))
2470 return 1;
2471
2472 return 0;
2473 }
2474
can_be_granted(struct dlm_rsb * r,struct dlm_lkb * lkb,int now,int recover,int * err)2475 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2476 int recover, int *err)
2477 {
2478 int rv;
2479 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2480 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2481
2482 if (err)
2483 *err = 0;
2484
2485 rv = _can_be_granted(r, lkb, now, recover);
2486 if (rv)
2487 goto out;
2488
2489 /*
2490 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2491 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2492 * cancels one of the locks.
2493 */
2494
2495 if (is_convert && can_be_queued(lkb) &&
2496 conversion_deadlock_detect(r, lkb)) {
2497 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2498 lkb->lkb_grmode = DLM_LOCK_NL;
2499 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2500 } else if (err) {
2501 *err = -EDEADLK;
2502 } else {
2503 log_print("can_be_granted deadlock %x now %d",
2504 lkb->lkb_id, now);
2505 dlm_dump_rsb(r);
2506 }
2507 goto out;
2508 }
2509
2510 /*
2511 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2512 * to grant a request in a mode other than the normal rqmode. It's a
2513 * simple way to provide a big optimization to applications that can
2514 * use them.
2515 */
2516
2517 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2518 alt = DLM_LOCK_PR;
2519 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2520 alt = DLM_LOCK_CW;
2521
2522 if (alt) {
2523 lkb->lkb_rqmode = alt;
2524 rv = _can_be_granted(r, lkb, now, 0);
2525 if (rv)
2526 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2527 else
2528 lkb->lkb_rqmode = rqmode;
2529 }
2530 out:
2531 return rv;
2532 }
2533
2534 /* Returns the highest requested mode of all blocked conversions; sets
2535 cw if there's a blocked conversion to DLM_LOCK_CW. */
2536
grant_pending_convert(struct dlm_rsb * r,int high,int * cw,unsigned int * count)2537 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2538 unsigned int *count)
2539 {
2540 struct dlm_lkb *lkb, *s;
2541 int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2542 int hi, demoted, quit, grant_restart, demote_restart;
2543 int deadlk;
2544
2545 quit = 0;
2546 restart:
2547 grant_restart = 0;
2548 demote_restart = 0;
2549 hi = DLM_LOCK_IV;
2550
2551 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2552 demoted = is_demoted(lkb);
2553 deadlk = 0;
2554
2555 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2556 grant_lock_pending(r, lkb);
2557 grant_restart = 1;
2558 if (count)
2559 (*count)++;
2560 continue;
2561 }
2562
2563 if (!demoted && is_demoted(lkb)) {
2564 log_print("WARN: pending demoted %x node %d %s",
2565 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2566 demote_restart = 1;
2567 continue;
2568 }
2569
2570 if (deadlk) {
2571 /*
2572 * If DLM_LKB_NODLKWT flag is set and conversion
2573 * deadlock is detected, we request blocking AST and
2574 * down (or cancel) conversion.
2575 */
2576 if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2577 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2578 queue_bast(r, lkb, lkb->lkb_rqmode);
2579 lkb->lkb_highbast = lkb->lkb_rqmode;
2580 }
2581 } else {
2582 log_print("WARN: pending deadlock %x node %d %s",
2583 lkb->lkb_id, lkb->lkb_nodeid,
2584 r->res_name);
2585 dlm_dump_rsb(r);
2586 }
2587 continue;
2588 }
2589
2590 hi = max_t(int, lkb->lkb_rqmode, hi);
2591
2592 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2593 *cw = 1;
2594 }
2595
2596 if (grant_restart)
2597 goto restart;
2598 if (demote_restart && !quit) {
2599 quit = 1;
2600 goto restart;
2601 }
2602
2603 return max_t(int, high, hi);
2604 }
2605
grant_pending_wait(struct dlm_rsb * r,int high,int * cw,unsigned int * count)2606 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2607 unsigned int *count)
2608 {
2609 struct dlm_lkb *lkb, *s;
2610
2611 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2612 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2613 grant_lock_pending(r, lkb);
2614 if (count)
2615 (*count)++;
2616 } else {
2617 high = max_t(int, lkb->lkb_rqmode, high);
2618 if (lkb->lkb_rqmode == DLM_LOCK_CW)
2619 *cw = 1;
2620 }
2621 }
2622
2623 return high;
2624 }
2625
2626 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2627 on either the convert or waiting queue.
2628 high is the largest rqmode of all locks blocked on the convert or
2629 waiting queue. */
2630
lock_requires_bast(struct dlm_lkb * gr,int high,int cw)2631 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2632 {
2633 if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2634 if (gr->lkb_highbast < DLM_LOCK_EX)
2635 return 1;
2636 return 0;
2637 }
2638
2639 if (gr->lkb_highbast < high &&
2640 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2641 return 1;
2642 return 0;
2643 }
2644
grant_pending_locks(struct dlm_rsb * r,unsigned int * count)2645 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2646 {
2647 struct dlm_lkb *lkb, *s;
2648 int high = DLM_LOCK_IV;
2649 int cw = 0;
2650
2651 if (!is_master(r)) {
2652 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2653 dlm_dump_rsb(r);
2654 return;
2655 }
2656
2657 high = grant_pending_convert(r, high, &cw, count);
2658 high = grant_pending_wait(r, high, &cw, count);
2659
2660 if (high == DLM_LOCK_IV)
2661 return;
2662
2663 /*
2664 * If there are locks left on the wait/convert queue then send blocking
2665 * ASTs to granted locks based on the largest requested mode (high)
2666 * found above.
2667 */
2668
2669 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2670 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2671 if (cw && high == DLM_LOCK_PR &&
2672 lkb->lkb_grmode == DLM_LOCK_PR)
2673 queue_bast(r, lkb, DLM_LOCK_CW);
2674 else
2675 queue_bast(r, lkb, high);
2676 lkb->lkb_highbast = high;
2677 }
2678 }
2679 }
2680
modes_require_bast(struct dlm_lkb * gr,struct dlm_lkb * rq)2681 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2682 {
2683 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2684 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2685 if (gr->lkb_highbast < DLM_LOCK_EX)
2686 return 1;
2687 return 0;
2688 }
2689
2690 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2691 return 1;
2692 return 0;
2693 }
2694
send_bast_queue(struct dlm_rsb * r,struct list_head * head,struct dlm_lkb * lkb)2695 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2696 struct dlm_lkb *lkb)
2697 {
2698 struct dlm_lkb *gr;
2699
2700 list_for_each_entry(gr, head, lkb_statequeue) {
2701 /* skip self when sending basts to convertqueue */
2702 if (gr == lkb)
2703 continue;
2704 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2705 queue_bast(r, gr, lkb->lkb_rqmode);
2706 gr->lkb_highbast = lkb->lkb_rqmode;
2707 }
2708 }
2709 }
2710
send_blocking_asts(struct dlm_rsb * r,struct dlm_lkb * lkb)2711 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2712 {
2713 send_bast_queue(r, &r->res_grantqueue, lkb);
2714 }
2715
send_blocking_asts_all(struct dlm_rsb * r,struct dlm_lkb * lkb)2716 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2717 {
2718 send_bast_queue(r, &r->res_grantqueue, lkb);
2719 send_bast_queue(r, &r->res_convertqueue, lkb);
2720 }
2721
2722 /* set_master(r, lkb) -- set the master nodeid of a resource
2723
2724 The purpose of this function is to set the nodeid field in the given
2725 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
2726 known, it can just be copied to the lkb and the function will return
2727 0. If the rsb's nodeid is _not_ known, it needs to be looked up
2728 before it can be copied to the lkb.
2729
2730 When the rsb nodeid is being looked up remotely, the initial lkb
2731 causing the lookup is kept on the ls_waiters list waiting for the
2732 lookup reply. Other lkb's waiting for the same rsb lookup are kept
2733 on the rsb's res_lookup list until the master is verified.
2734
2735 Return values:
2736 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2737 1: the rsb master is not available and the lkb has been placed on
2738 a wait queue
2739 */
2740
set_master(struct dlm_rsb * r,struct dlm_lkb * lkb)2741 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2742 {
2743 int our_nodeid = dlm_our_nodeid();
2744
2745 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2746 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2747 r->res_first_lkid = lkb->lkb_id;
2748 lkb->lkb_nodeid = r->res_nodeid;
2749 return 0;
2750 }
2751
2752 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2753 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2754 return 1;
2755 }
2756
2757 if (r->res_master_nodeid == our_nodeid) {
2758 lkb->lkb_nodeid = 0;
2759 return 0;
2760 }
2761
2762 if (r->res_master_nodeid) {
2763 lkb->lkb_nodeid = r->res_master_nodeid;
2764 return 0;
2765 }
2766
2767 if (dlm_dir_nodeid(r) == our_nodeid) {
2768 /* This is a somewhat unusual case; find_rsb will usually
2769 have set res_master_nodeid when dir nodeid is local, but
2770 there are cases where we become the dir node after we've
2771 past find_rsb and go through _request_lock again.
2772 confirm_master() or process_lookup_list() needs to be
2773 called after this. */
2774 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2775 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2776 r->res_name);
2777 r->res_master_nodeid = our_nodeid;
2778 r->res_nodeid = 0;
2779 lkb->lkb_nodeid = 0;
2780 return 0;
2781 }
2782
2783 wait_pending_remove(r);
2784
2785 r->res_first_lkid = lkb->lkb_id;
2786 send_lookup(r, lkb);
2787 return 1;
2788 }
2789
process_lookup_list(struct dlm_rsb * r)2790 static void process_lookup_list(struct dlm_rsb *r)
2791 {
2792 struct dlm_lkb *lkb, *safe;
2793
2794 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2795 list_del_init(&lkb->lkb_rsb_lookup);
2796 _request_lock(r, lkb);
2797 schedule();
2798 }
2799 }
2800
2801 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2802
confirm_master(struct dlm_rsb * r,int error)2803 static void confirm_master(struct dlm_rsb *r, int error)
2804 {
2805 struct dlm_lkb *lkb;
2806
2807 if (!r->res_first_lkid)
2808 return;
2809
2810 switch (error) {
2811 case 0:
2812 case -EINPROGRESS:
2813 r->res_first_lkid = 0;
2814 process_lookup_list(r);
2815 break;
2816
2817 case -EAGAIN:
2818 case -EBADR:
2819 case -ENOTBLK:
2820 /* the remote request failed and won't be retried (it was
2821 a NOQUEUE, or has been canceled/unlocked); make a waiting
2822 lkb the first_lkid */
2823
2824 r->res_first_lkid = 0;
2825
2826 if (!list_empty(&r->res_lookup)) {
2827 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2828 lkb_rsb_lookup);
2829 list_del_init(&lkb->lkb_rsb_lookup);
2830 r->res_first_lkid = lkb->lkb_id;
2831 _request_lock(r, lkb);
2832 }
2833 break;
2834
2835 default:
2836 log_error(r->res_ls, "confirm_master unknown error %d", error);
2837 }
2838 }
2839
set_lock_args(int mode,struct dlm_lksb * lksb,uint32_t flags,int namelen,unsigned long timeout_cs,void (* ast)(void * astparam),void * astparam,void (* bast)(void * astparam,int mode),struct dlm_args * args)2840 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2841 int namelen, unsigned long timeout_cs,
2842 void (*ast) (void *astparam),
2843 void *astparam,
2844 void (*bast) (void *astparam, int mode),
2845 struct dlm_args *args)
2846 {
2847 int rv = -EINVAL;
2848
2849 /* check for invalid arg usage */
2850
2851 if (mode < 0 || mode > DLM_LOCK_EX)
2852 goto out;
2853
2854 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2855 goto out;
2856
2857 if (flags & DLM_LKF_CANCEL)
2858 goto out;
2859
2860 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2861 goto out;
2862
2863 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2864 goto out;
2865
2866 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2867 goto out;
2868
2869 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2870 goto out;
2871
2872 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2873 goto out;
2874
2875 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2876 goto out;
2877
2878 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2879 goto out;
2880
2881 if (!ast || !lksb)
2882 goto out;
2883
2884 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2885 goto out;
2886
2887 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2888 goto out;
2889
2890 /* these args will be copied to the lkb in validate_lock_args,
2891 it cannot be done now because when converting locks, fields in
2892 an active lkb cannot be modified before locking the rsb */
2893
2894 args->flags = flags;
2895 args->astfn = ast;
2896 args->astparam = astparam;
2897 args->bastfn = bast;
2898 args->timeout = timeout_cs;
2899 args->mode = mode;
2900 args->lksb = lksb;
2901 rv = 0;
2902 out:
2903 return rv;
2904 }
2905
set_unlock_args(uint32_t flags,void * astarg,struct dlm_args * args)2906 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2907 {
2908 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2909 DLM_LKF_FORCEUNLOCK))
2910 return -EINVAL;
2911
2912 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2913 return -EINVAL;
2914
2915 args->flags = flags;
2916 args->astparam = astarg;
2917 return 0;
2918 }
2919
validate_lock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)2920 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2921 struct dlm_args *args)
2922 {
2923 int rv = -EINVAL;
2924
2925 if (args->flags & DLM_LKF_CONVERT) {
2926 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2927 goto out;
2928
2929 if (args->flags & DLM_LKF_QUECVT &&
2930 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2931 goto out;
2932
2933 rv = -EBUSY;
2934 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2935 goto out;
2936
2937 /* lock not allowed if there's any op in progress */
2938 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2939 goto out;
2940
2941 if (is_overlap(lkb))
2942 goto out;
2943 }
2944
2945 lkb->lkb_exflags = args->flags;
2946 lkb->lkb_sbflags = 0;
2947 lkb->lkb_astfn = args->astfn;
2948 lkb->lkb_astparam = args->astparam;
2949 lkb->lkb_bastfn = args->bastfn;
2950 lkb->lkb_rqmode = args->mode;
2951 lkb->lkb_lksb = args->lksb;
2952 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2953 lkb->lkb_ownpid = (int) current->pid;
2954 lkb->lkb_timeout_cs = args->timeout;
2955 rv = 0;
2956 out:
2957 if (rv)
2958 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2959 rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2960 lkb->lkb_status, lkb->lkb_wait_type,
2961 lkb->lkb_resource->res_name);
2962 return rv;
2963 }
2964
2965 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2966 for success */
2967
2968 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2969 because there may be a lookup in progress and it's valid to do
2970 cancel/unlockf on it */
2971
validate_unlock_args(struct dlm_lkb * lkb,struct dlm_args * args)2972 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2973 {
2974 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2975 int rv = -EINVAL;
2976
2977 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2978 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2979 dlm_print_lkb(lkb);
2980 goto out;
2981 }
2982
2983 /* an lkb may still exist even though the lock is EOL'ed due to a
2984 cancel, unlock or failed noqueue request; an app can't use these
2985 locks; return same error as if the lkid had not been found at all */
2986
2987 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2988 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2989 rv = -ENOENT;
2990 goto out;
2991 }
2992
2993 /* an lkb may be waiting for an rsb lookup to complete where the
2994 lookup was initiated by another lock */
2995
2996 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2997 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2998 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2999 list_del_init(&lkb->lkb_rsb_lookup);
3000 queue_cast(lkb->lkb_resource, lkb,
3001 args->flags & DLM_LKF_CANCEL ?
3002 -DLM_ECANCEL : -DLM_EUNLOCK);
3003 unhold_lkb(lkb); /* undoes create_lkb() */
3004 }
3005 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
3006 rv = -EBUSY;
3007 goto out;
3008 }
3009
3010 /* cancel not allowed with another cancel/unlock in progress */
3011
3012 if (args->flags & DLM_LKF_CANCEL) {
3013 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
3014 goto out;
3015
3016 if (is_overlap(lkb))
3017 goto out;
3018
3019 /* don't let scand try to do a cancel */
3020 del_timeout(lkb);
3021
3022 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3023 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
3024 rv = -EBUSY;
3025 goto out;
3026 }
3027
3028 /* there's nothing to cancel */
3029 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
3030 !lkb->lkb_wait_type) {
3031 rv = -EBUSY;
3032 goto out;
3033 }
3034
3035 switch (lkb->lkb_wait_type) {
3036 case DLM_MSG_LOOKUP:
3037 case DLM_MSG_REQUEST:
3038 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
3039 rv = -EBUSY;
3040 goto out;
3041 case DLM_MSG_UNLOCK:
3042 case DLM_MSG_CANCEL:
3043 goto out;
3044 }
3045 /* add_to_waiters() will set OVERLAP_CANCEL */
3046 goto out_ok;
3047 }
3048
3049 /* do we need to allow a force-unlock if there's a normal unlock
3050 already in progress? in what conditions could the normal unlock
3051 fail such that we'd want to send a force-unlock to be sure? */
3052
3053 if (args->flags & DLM_LKF_FORCEUNLOCK) {
3054 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
3055 goto out;
3056
3057 if (is_overlap_unlock(lkb))
3058 goto out;
3059
3060 /* don't let scand try to do a cancel */
3061 del_timeout(lkb);
3062
3063 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3064 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3065 rv = -EBUSY;
3066 goto out;
3067 }
3068
3069 switch (lkb->lkb_wait_type) {
3070 case DLM_MSG_LOOKUP:
3071 case DLM_MSG_REQUEST:
3072 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3073 rv = -EBUSY;
3074 goto out;
3075 case DLM_MSG_UNLOCK:
3076 goto out;
3077 }
3078 /* add_to_waiters() will set OVERLAP_UNLOCK */
3079 goto out_ok;
3080 }
3081
3082 /* normal unlock not allowed if there's any op in progress */
3083 rv = -EBUSY;
3084 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
3085 goto out;
3086
3087 out_ok:
3088 /* an overlapping op shouldn't blow away exflags from other op */
3089 lkb->lkb_exflags |= args->flags;
3090 lkb->lkb_sbflags = 0;
3091 lkb->lkb_astparam = args->astparam;
3092 rv = 0;
3093 out:
3094 if (rv)
3095 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
3096 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3097 args->flags, lkb->lkb_wait_type,
3098 lkb->lkb_resource->res_name);
3099 return rv;
3100 }
3101
3102 /*
3103 * Four stage 4 varieties:
3104 * do_request(), do_convert(), do_unlock(), do_cancel()
3105 * These are called on the master node for the given lock and
3106 * from the central locking logic.
3107 */
3108
do_request(struct dlm_rsb * r,struct dlm_lkb * lkb)3109 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3110 {
3111 int error = 0;
3112
3113 if (can_be_granted(r, lkb, 1, 0, NULL)) {
3114 grant_lock(r, lkb);
3115 queue_cast(r, lkb, 0);
3116 goto out;
3117 }
3118
3119 if (can_be_queued(lkb)) {
3120 error = -EINPROGRESS;
3121 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3122 add_timeout(lkb);
3123 goto out;
3124 }
3125
3126 error = -EAGAIN;
3127 queue_cast(r, lkb, -EAGAIN);
3128 out:
3129 return error;
3130 }
3131
do_request_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3132 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3133 int error)
3134 {
3135 switch (error) {
3136 case -EAGAIN:
3137 if (force_blocking_asts(lkb))
3138 send_blocking_asts_all(r, lkb);
3139 break;
3140 case -EINPROGRESS:
3141 send_blocking_asts(r, lkb);
3142 break;
3143 }
3144 }
3145
do_convert(struct dlm_rsb * r,struct dlm_lkb * lkb)3146 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3147 {
3148 int error = 0;
3149 int deadlk = 0;
3150
3151 /* changing an existing lock may allow others to be granted */
3152
3153 if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3154 grant_lock(r, lkb);
3155 queue_cast(r, lkb, 0);
3156 goto out;
3157 }
3158
3159 /* can_be_granted() detected that this lock would block in a conversion
3160 deadlock, so we leave it on the granted queue and return EDEADLK in
3161 the ast for the convert. */
3162
3163 if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3164 /* it's left on the granted queue */
3165 revert_lock(r, lkb);
3166 queue_cast(r, lkb, -EDEADLK);
3167 error = -EDEADLK;
3168 goto out;
3169 }
3170
3171 /* is_demoted() means the can_be_granted() above set the grmode
3172 to NL, and left us on the granted queue. This auto-demotion
3173 (due to CONVDEADLK) might mean other locks, and/or this lock, are
3174 now grantable. We have to try to grant other converting locks
3175 before we try again to grant this one. */
3176
3177 if (is_demoted(lkb)) {
3178 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3179 if (_can_be_granted(r, lkb, 1, 0)) {
3180 grant_lock(r, lkb);
3181 queue_cast(r, lkb, 0);
3182 goto out;
3183 }
3184 /* else fall through and move to convert queue */
3185 }
3186
3187 if (can_be_queued(lkb)) {
3188 error = -EINPROGRESS;
3189 del_lkb(r, lkb);
3190 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3191 add_timeout(lkb);
3192 goto out;
3193 }
3194
3195 error = -EAGAIN;
3196 queue_cast(r, lkb, -EAGAIN);
3197 out:
3198 return error;
3199 }
3200
do_convert_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3201 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3202 int error)
3203 {
3204 switch (error) {
3205 case 0:
3206 grant_pending_locks(r, NULL);
3207 /* grant_pending_locks also sends basts */
3208 break;
3209 case -EAGAIN:
3210 if (force_blocking_asts(lkb))
3211 send_blocking_asts_all(r, lkb);
3212 break;
3213 case -EINPROGRESS:
3214 send_blocking_asts(r, lkb);
3215 break;
3216 }
3217 }
3218
do_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)3219 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3220 {
3221 remove_lock(r, lkb);
3222 queue_cast(r, lkb, -DLM_EUNLOCK);
3223 return -DLM_EUNLOCK;
3224 }
3225
do_unlock_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3226 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3227 int error)
3228 {
3229 grant_pending_locks(r, NULL);
3230 }
3231
3232 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3233
do_cancel(struct dlm_rsb * r,struct dlm_lkb * lkb)3234 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3235 {
3236 int error;
3237
3238 error = revert_lock(r, lkb);
3239 if (error) {
3240 queue_cast(r, lkb, -DLM_ECANCEL);
3241 return -DLM_ECANCEL;
3242 }
3243 return 0;
3244 }
3245
do_cancel_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3246 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3247 int error)
3248 {
3249 if (error)
3250 grant_pending_locks(r, NULL);
3251 }
3252
3253 /*
3254 * Four stage 3 varieties:
3255 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3256 */
3257
3258 /* add a new lkb to a possibly new rsb, called by requesting process */
3259
_request_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3260 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3261 {
3262 int error;
3263
3264 /* set_master: sets lkb nodeid from r */
3265
3266 error = set_master(r, lkb);
3267 if (error < 0)
3268 goto out;
3269 if (error) {
3270 error = 0;
3271 goto out;
3272 }
3273
3274 if (is_remote(r)) {
3275 /* receive_request() calls do_request() on remote node */
3276 error = send_request(r, lkb);
3277 } else {
3278 error = do_request(r, lkb);
3279 /* for remote locks the request_reply is sent
3280 between do_request and do_request_effects */
3281 do_request_effects(r, lkb, error);
3282 }
3283 out:
3284 return error;
3285 }
3286
3287 /* change some property of an existing lkb, e.g. mode */
3288
_convert_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3289 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3290 {
3291 int error;
3292
3293 if (is_remote(r)) {
3294 /* receive_convert() calls do_convert() on remote node */
3295 error = send_convert(r, lkb);
3296 } else {
3297 error = do_convert(r, lkb);
3298 /* for remote locks the convert_reply is sent
3299 between do_convert and do_convert_effects */
3300 do_convert_effects(r, lkb, error);
3301 }
3302
3303 return error;
3304 }
3305
3306 /* remove an existing lkb from the granted queue */
3307
_unlock_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3308 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3309 {
3310 int error;
3311
3312 if (is_remote(r)) {
3313 /* receive_unlock() calls do_unlock() on remote node */
3314 error = send_unlock(r, lkb);
3315 } else {
3316 error = do_unlock(r, lkb);
3317 /* for remote locks the unlock_reply is sent
3318 between do_unlock and do_unlock_effects */
3319 do_unlock_effects(r, lkb, error);
3320 }
3321
3322 return error;
3323 }
3324
3325 /* remove an existing lkb from the convert or wait queue */
3326
_cancel_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3327 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3328 {
3329 int error;
3330
3331 if (is_remote(r)) {
3332 /* receive_cancel() calls do_cancel() on remote node */
3333 error = send_cancel(r, lkb);
3334 } else {
3335 error = do_cancel(r, lkb);
3336 /* for remote locks the cancel_reply is sent
3337 between do_cancel and do_cancel_effects */
3338 do_cancel_effects(r, lkb, error);
3339 }
3340
3341 return error;
3342 }
3343
3344 /*
3345 * Four stage 2 varieties:
3346 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3347 */
3348
request_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,char * name,int len,struct dlm_args * args)3349 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3350 int len, struct dlm_args *args)
3351 {
3352 struct dlm_rsb *r;
3353 int error;
3354
3355 error = validate_lock_args(ls, lkb, args);
3356 if (error)
3357 return error;
3358
3359 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3360 if (error)
3361 return error;
3362
3363 lock_rsb(r);
3364
3365 attach_lkb(r, lkb);
3366 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3367
3368 error = _request_lock(r, lkb);
3369
3370 unlock_rsb(r);
3371 put_rsb(r);
3372 return error;
3373 }
3374
convert_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3375 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3376 struct dlm_args *args)
3377 {
3378 struct dlm_rsb *r;
3379 int error;
3380
3381 r = lkb->lkb_resource;
3382
3383 hold_rsb(r);
3384 lock_rsb(r);
3385
3386 error = validate_lock_args(ls, lkb, args);
3387 if (error)
3388 goto out;
3389
3390 error = _convert_lock(r, lkb);
3391 out:
3392 unlock_rsb(r);
3393 put_rsb(r);
3394 return error;
3395 }
3396
unlock_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3397 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3398 struct dlm_args *args)
3399 {
3400 struct dlm_rsb *r;
3401 int error;
3402
3403 r = lkb->lkb_resource;
3404
3405 hold_rsb(r);
3406 lock_rsb(r);
3407
3408 error = validate_unlock_args(lkb, args);
3409 if (error)
3410 goto out;
3411
3412 error = _unlock_lock(r, lkb);
3413 out:
3414 unlock_rsb(r);
3415 put_rsb(r);
3416 return error;
3417 }
3418
cancel_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3419 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3420 struct dlm_args *args)
3421 {
3422 struct dlm_rsb *r;
3423 int error;
3424
3425 r = lkb->lkb_resource;
3426
3427 hold_rsb(r);
3428 lock_rsb(r);
3429
3430 error = validate_unlock_args(lkb, args);
3431 if (error)
3432 goto out;
3433
3434 error = _cancel_lock(r, lkb);
3435 out:
3436 unlock_rsb(r);
3437 put_rsb(r);
3438 return error;
3439 }
3440
3441 /*
3442 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
3443 */
3444
dlm_lock(dlm_lockspace_t * lockspace,int mode,struct dlm_lksb * lksb,uint32_t flags,void * name,unsigned int namelen,uint32_t parent_lkid,void (* ast)(void * astarg),void * astarg,void (* bast)(void * astarg,int mode))3445 int dlm_lock(dlm_lockspace_t *lockspace,
3446 int mode,
3447 struct dlm_lksb *lksb,
3448 uint32_t flags,
3449 void *name,
3450 unsigned int namelen,
3451 uint32_t parent_lkid,
3452 void (*ast) (void *astarg),
3453 void *astarg,
3454 void (*bast) (void *astarg, int mode))
3455 {
3456 struct dlm_ls *ls;
3457 struct dlm_lkb *lkb;
3458 struct dlm_args args;
3459 int error, convert = flags & DLM_LKF_CONVERT;
3460
3461 ls = dlm_find_lockspace_local(lockspace);
3462 if (!ls)
3463 return -EINVAL;
3464
3465 dlm_lock_recovery(ls);
3466
3467 if (convert)
3468 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3469 else
3470 error = create_lkb(ls, &lkb);
3471
3472 if (error)
3473 goto out;
3474
3475 trace_dlm_lock_start(ls, lkb, mode, flags);
3476
3477 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3478 astarg, bast, &args);
3479 if (error)
3480 goto out_put;
3481
3482 if (convert)
3483 error = convert_lock(ls, lkb, &args);
3484 else
3485 error = request_lock(ls, lkb, name, namelen, &args);
3486
3487 if (error == -EINPROGRESS)
3488 error = 0;
3489 out_put:
3490 trace_dlm_lock_end(ls, lkb, mode, flags, error);
3491
3492 if (convert || error)
3493 __put_lkb(ls, lkb);
3494 if (error == -EAGAIN || error == -EDEADLK)
3495 error = 0;
3496 out:
3497 dlm_unlock_recovery(ls);
3498 dlm_put_lockspace(ls);
3499 return error;
3500 }
3501
dlm_unlock(dlm_lockspace_t * lockspace,uint32_t lkid,uint32_t flags,struct dlm_lksb * lksb,void * astarg)3502 int dlm_unlock(dlm_lockspace_t *lockspace,
3503 uint32_t lkid,
3504 uint32_t flags,
3505 struct dlm_lksb *lksb,
3506 void *astarg)
3507 {
3508 struct dlm_ls *ls;
3509 struct dlm_lkb *lkb;
3510 struct dlm_args args;
3511 int error;
3512
3513 ls = dlm_find_lockspace_local(lockspace);
3514 if (!ls)
3515 return -EINVAL;
3516
3517 dlm_lock_recovery(ls);
3518
3519 error = find_lkb(ls, lkid, &lkb);
3520 if (error)
3521 goto out;
3522
3523 trace_dlm_unlock_start(ls, lkb, flags);
3524
3525 error = set_unlock_args(flags, astarg, &args);
3526 if (error)
3527 goto out_put;
3528
3529 if (flags & DLM_LKF_CANCEL)
3530 error = cancel_lock(ls, lkb, &args);
3531 else
3532 error = unlock_lock(ls, lkb, &args);
3533
3534 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3535 error = 0;
3536 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3537 error = 0;
3538 out_put:
3539 trace_dlm_unlock_end(ls, lkb, flags, error);
3540
3541 dlm_put_lkb(lkb);
3542 out:
3543 dlm_unlock_recovery(ls);
3544 dlm_put_lockspace(ls);
3545 return error;
3546 }
3547
3548 /*
3549 * send/receive routines for remote operations and replies
3550 *
3551 * send_args
3552 * send_common
3553 * send_request receive_request
3554 * send_convert receive_convert
3555 * send_unlock receive_unlock
3556 * send_cancel receive_cancel
3557 * send_grant receive_grant
3558 * send_bast receive_bast
3559 * send_lookup receive_lookup
3560 * send_remove receive_remove
3561 *
3562 * send_common_reply
3563 * receive_request_reply send_request_reply
3564 * receive_convert_reply send_convert_reply
3565 * receive_unlock_reply send_unlock_reply
3566 * receive_cancel_reply send_cancel_reply
3567 * receive_lookup_reply send_lookup_reply
3568 */
3569
_create_message(struct dlm_ls * ls,int mb_len,int to_nodeid,int mstype,struct dlm_message ** ms_ret,struct dlm_mhandle ** mh_ret)3570 static int _create_message(struct dlm_ls *ls, int mb_len,
3571 int to_nodeid, int mstype,
3572 struct dlm_message **ms_ret,
3573 struct dlm_mhandle **mh_ret)
3574 {
3575 struct dlm_message *ms;
3576 struct dlm_mhandle *mh;
3577 char *mb;
3578
3579 /* get_buffer gives us a message handle (mh) that we need to
3580 pass into midcomms_commit and a message buffer (mb) that we
3581 write our data into */
3582
3583 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
3584 if (!mh)
3585 return -ENOBUFS;
3586
3587 ms = (struct dlm_message *) mb;
3588
3589 ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3590 ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3591 ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3592 ms->m_header.h_length = cpu_to_le16(mb_len);
3593 ms->m_header.h_cmd = DLM_MSG;
3594
3595 ms->m_type = cpu_to_le32(mstype);
3596
3597 *mh_ret = mh;
3598 *ms_ret = ms;
3599 return 0;
3600 }
3601
create_message(struct dlm_rsb * r,struct dlm_lkb * lkb,int to_nodeid,int mstype,struct dlm_message ** ms_ret,struct dlm_mhandle ** mh_ret)3602 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3603 int to_nodeid, int mstype,
3604 struct dlm_message **ms_ret,
3605 struct dlm_mhandle **mh_ret)
3606 {
3607 int mb_len = sizeof(struct dlm_message);
3608
3609 switch (mstype) {
3610 case DLM_MSG_REQUEST:
3611 case DLM_MSG_LOOKUP:
3612 case DLM_MSG_REMOVE:
3613 mb_len += r->res_length;
3614 break;
3615 case DLM_MSG_CONVERT:
3616 case DLM_MSG_UNLOCK:
3617 case DLM_MSG_REQUEST_REPLY:
3618 case DLM_MSG_CONVERT_REPLY:
3619 case DLM_MSG_GRANT:
3620 if (lkb && lkb->lkb_lvbptr)
3621 mb_len += r->res_ls->ls_lvblen;
3622 break;
3623 }
3624
3625 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3626 ms_ret, mh_ret);
3627 }
3628
3629 /* further lowcomms enhancements or alternate implementations may make
3630 the return value from this function useful at some point */
3631
send_message(struct dlm_mhandle * mh,struct dlm_message * ms)3632 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3633 {
3634 dlm_midcomms_commit_mhandle(mh);
3635 return 0;
3636 }
3637
send_args(struct dlm_rsb * r,struct dlm_lkb * lkb,struct dlm_message * ms)3638 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3639 struct dlm_message *ms)
3640 {
3641 ms->m_nodeid = cpu_to_le32(lkb->lkb_nodeid);
3642 ms->m_pid = cpu_to_le32(lkb->lkb_ownpid);
3643 ms->m_lkid = cpu_to_le32(lkb->lkb_id);
3644 ms->m_remid = cpu_to_le32(lkb->lkb_remid);
3645 ms->m_exflags = cpu_to_le32(lkb->lkb_exflags);
3646 ms->m_sbflags = cpu_to_le32(lkb->lkb_sbflags);
3647 ms->m_flags = cpu_to_le32(lkb->lkb_flags);
3648 ms->m_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
3649 ms->m_status = cpu_to_le32(lkb->lkb_status);
3650 ms->m_grmode = cpu_to_le32(lkb->lkb_grmode);
3651 ms->m_rqmode = cpu_to_le32(lkb->lkb_rqmode);
3652 ms->m_hash = cpu_to_le32(r->res_hash);
3653
3654 /* m_result and m_bastmode are set from function args,
3655 not from lkb fields */
3656
3657 if (lkb->lkb_bastfn)
3658 ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3659 if (lkb->lkb_astfn)
3660 ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3661
3662 /* compare with switch in create_message; send_remove() doesn't
3663 use send_args() */
3664
3665 switch (ms->m_type) {
3666 case cpu_to_le32(DLM_MSG_REQUEST):
3667 case cpu_to_le32(DLM_MSG_LOOKUP):
3668 memcpy(ms->m_extra, r->res_name, r->res_length);
3669 break;
3670 case cpu_to_le32(DLM_MSG_CONVERT):
3671 case cpu_to_le32(DLM_MSG_UNLOCK):
3672 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3673 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3674 case cpu_to_le32(DLM_MSG_GRANT):
3675 if (!lkb->lkb_lvbptr)
3676 break;
3677 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3678 break;
3679 }
3680 }
3681
send_common(struct dlm_rsb * r,struct dlm_lkb * lkb,int mstype)3682 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3683 {
3684 struct dlm_message *ms;
3685 struct dlm_mhandle *mh;
3686 int to_nodeid, error;
3687
3688 to_nodeid = r->res_nodeid;
3689
3690 error = add_to_waiters(lkb, mstype, to_nodeid);
3691 if (error)
3692 return error;
3693
3694 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3695 if (error)
3696 goto fail;
3697
3698 send_args(r, lkb, ms);
3699
3700 error = send_message(mh, ms);
3701 if (error)
3702 goto fail;
3703 return 0;
3704
3705 fail:
3706 remove_from_waiters(lkb, msg_reply_type(mstype));
3707 return error;
3708 }
3709
send_request(struct dlm_rsb * r,struct dlm_lkb * lkb)3710 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3711 {
3712 return send_common(r, lkb, DLM_MSG_REQUEST);
3713 }
3714
send_convert(struct dlm_rsb * r,struct dlm_lkb * lkb)3715 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3716 {
3717 int error;
3718
3719 error = send_common(r, lkb, DLM_MSG_CONVERT);
3720
3721 /* down conversions go without a reply from the master */
3722 if (!error && down_conversion(lkb)) {
3723 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3724 r->res_ls->ls_stub_ms.m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
3725 r->res_ls->ls_stub_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3726 r->res_ls->ls_stub_ms.m_result = 0;
3727 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3728 }
3729
3730 return error;
3731 }
3732
3733 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3734 MASTER_UNCERTAIN to force the next request on the rsb to confirm
3735 that the master is still correct. */
3736
send_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)3737 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3738 {
3739 return send_common(r, lkb, DLM_MSG_UNLOCK);
3740 }
3741
send_cancel(struct dlm_rsb * r,struct dlm_lkb * lkb)3742 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3743 {
3744 return send_common(r, lkb, DLM_MSG_CANCEL);
3745 }
3746
send_grant(struct dlm_rsb * r,struct dlm_lkb * lkb)3747 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3748 {
3749 struct dlm_message *ms;
3750 struct dlm_mhandle *mh;
3751 int to_nodeid, error;
3752
3753 to_nodeid = lkb->lkb_nodeid;
3754
3755 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3756 if (error)
3757 goto out;
3758
3759 send_args(r, lkb, ms);
3760
3761 ms->m_result = 0;
3762
3763 error = send_message(mh, ms);
3764 out:
3765 return error;
3766 }
3767
send_bast(struct dlm_rsb * r,struct dlm_lkb * lkb,int mode)3768 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3769 {
3770 struct dlm_message *ms;
3771 struct dlm_mhandle *mh;
3772 int to_nodeid, error;
3773
3774 to_nodeid = lkb->lkb_nodeid;
3775
3776 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3777 if (error)
3778 goto out;
3779
3780 send_args(r, lkb, ms);
3781
3782 ms->m_bastmode = cpu_to_le32(mode);
3783
3784 error = send_message(mh, ms);
3785 out:
3786 return error;
3787 }
3788
send_lookup(struct dlm_rsb * r,struct dlm_lkb * lkb)3789 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3790 {
3791 struct dlm_message *ms;
3792 struct dlm_mhandle *mh;
3793 int to_nodeid, error;
3794
3795 to_nodeid = dlm_dir_nodeid(r);
3796
3797 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3798 if (error)
3799 return error;
3800
3801 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3802 if (error)
3803 goto fail;
3804
3805 send_args(r, lkb, ms);
3806
3807 error = send_message(mh, ms);
3808 if (error)
3809 goto fail;
3810 return 0;
3811
3812 fail:
3813 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3814 return error;
3815 }
3816
send_remove(struct dlm_rsb * r)3817 static int send_remove(struct dlm_rsb *r)
3818 {
3819 struct dlm_message *ms;
3820 struct dlm_mhandle *mh;
3821 int to_nodeid, error;
3822
3823 to_nodeid = dlm_dir_nodeid(r);
3824
3825 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3826 if (error)
3827 goto out;
3828
3829 memcpy(ms->m_extra, r->res_name, r->res_length);
3830 ms->m_hash = cpu_to_le32(r->res_hash);
3831
3832 error = send_message(mh, ms);
3833 out:
3834 return error;
3835 }
3836
send_common_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int mstype,int rv)3837 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3838 int mstype, int rv)
3839 {
3840 struct dlm_message *ms;
3841 struct dlm_mhandle *mh;
3842 int to_nodeid, error;
3843
3844 to_nodeid = lkb->lkb_nodeid;
3845
3846 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3847 if (error)
3848 goto out;
3849
3850 send_args(r, lkb, ms);
3851
3852 ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3853
3854 error = send_message(mh, ms);
3855 out:
3856 return error;
3857 }
3858
send_request_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3859 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3860 {
3861 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3862 }
3863
send_convert_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3864 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3865 {
3866 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3867 }
3868
send_unlock_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3869 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3870 {
3871 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3872 }
3873
send_cancel_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3874 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3875 {
3876 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3877 }
3878
send_lookup_reply(struct dlm_ls * ls,struct dlm_message * ms_in,int ret_nodeid,int rv)3879 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3880 int ret_nodeid, int rv)
3881 {
3882 struct dlm_rsb *r = &ls->ls_stub_rsb;
3883 struct dlm_message *ms;
3884 struct dlm_mhandle *mh;
3885 int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3886
3887 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3888 if (error)
3889 goto out;
3890
3891 ms->m_lkid = ms_in->m_lkid;
3892 ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3893 ms->m_nodeid = cpu_to_le32(ret_nodeid);
3894
3895 error = send_message(mh, ms);
3896 out:
3897 return error;
3898 }
3899
3900 /* which args we save from a received message depends heavily on the type
3901 of message, unlike the send side where we can safely send everything about
3902 the lkb for any type of message */
3903
receive_flags(struct dlm_lkb * lkb,struct dlm_message * ms)3904 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3905 {
3906 lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3907 lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags);
3908 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3909 (le32_to_cpu(ms->m_flags) & 0x0000FFFF);
3910 }
3911
receive_flags_reply(struct dlm_lkb * lkb,struct dlm_message * ms)3912 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3913 {
3914 if (ms->m_flags == cpu_to_le32(DLM_IFL_STUB_MS))
3915 return;
3916
3917 lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags);
3918 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3919 (le32_to_cpu(ms->m_flags) & 0x0000FFFF);
3920 }
3921
receive_extralen(struct dlm_message * ms)3922 static int receive_extralen(struct dlm_message *ms)
3923 {
3924 return (le16_to_cpu(ms->m_header.h_length) -
3925 sizeof(struct dlm_message));
3926 }
3927
receive_lvb(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_message * ms)3928 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3929 struct dlm_message *ms)
3930 {
3931 int len;
3932
3933 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3934 if (!lkb->lkb_lvbptr)
3935 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3936 if (!lkb->lkb_lvbptr)
3937 return -ENOMEM;
3938 len = receive_extralen(ms);
3939 if (len > ls->ls_lvblen)
3940 len = ls->ls_lvblen;
3941 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3942 }
3943 return 0;
3944 }
3945
fake_bastfn(void * astparam,int mode)3946 static void fake_bastfn(void *astparam, int mode)
3947 {
3948 log_print("fake_bastfn should not be called");
3949 }
3950
fake_astfn(void * astparam)3951 static void fake_astfn(void *astparam)
3952 {
3953 log_print("fake_astfn should not be called");
3954 }
3955
receive_request_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_message * ms)3956 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3957 struct dlm_message *ms)
3958 {
3959 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3960 lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3961 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3962 lkb->lkb_grmode = DLM_LOCK_IV;
3963 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3964
3965 lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3966 lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3967
3968 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3969 /* lkb was just created so there won't be an lvb yet */
3970 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3971 if (!lkb->lkb_lvbptr)
3972 return -ENOMEM;
3973 }
3974
3975 return 0;
3976 }
3977
receive_convert_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_message * ms)3978 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3979 struct dlm_message *ms)
3980 {
3981 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3982 return -EBUSY;
3983
3984 if (receive_lvb(ls, lkb, ms))
3985 return -ENOMEM;
3986
3987 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3988 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3989
3990 return 0;
3991 }
3992
receive_unlock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_message * ms)3993 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3994 struct dlm_message *ms)
3995 {
3996 if (receive_lvb(ls, lkb, ms))
3997 return -ENOMEM;
3998 return 0;
3999 }
4000
4001 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
4002 uses to send a reply and that the remote end uses to process the reply. */
4003
setup_stub_lkb(struct dlm_ls * ls,struct dlm_message * ms)4004 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
4005 {
4006 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
4007 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4008 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4009 }
4010
4011 /* This is called after the rsb is locked so that we can safely inspect
4012 fields in the lkb. */
4013
validate_message(struct dlm_lkb * lkb,struct dlm_message * ms)4014 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
4015 {
4016 int from = le32_to_cpu(ms->m_header.h_nodeid);
4017 int error = 0;
4018
4019 /* currently mixing of user/kernel locks are not supported */
4020 if (ms->m_flags & cpu_to_le32(DLM_IFL_USER) &&
4021 ~lkb->lkb_flags & DLM_IFL_USER) {
4022 log_error(lkb->lkb_resource->res_ls,
4023 "got user dlm message for a kernel lock");
4024 error = -EINVAL;
4025 goto out;
4026 }
4027
4028 switch (ms->m_type) {
4029 case cpu_to_le32(DLM_MSG_CONVERT):
4030 case cpu_to_le32(DLM_MSG_UNLOCK):
4031 case cpu_to_le32(DLM_MSG_CANCEL):
4032 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
4033 error = -EINVAL;
4034 break;
4035
4036 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4037 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4038 case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4039 case cpu_to_le32(DLM_MSG_GRANT):
4040 case cpu_to_le32(DLM_MSG_BAST):
4041 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
4042 error = -EINVAL;
4043 break;
4044
4045 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4046 if (!is_process_copy(lkb))
4047 error = -EINVAL;
4048 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
4049 error = -EINVAL;
4050 break;
4051
4052 default:
4053 error = -EINVAL;
4054 }
4055
4056 out:
4057 if (error)
4058 log_error(lkb->lkb_resource->res_ls,
4059 "ignore invalid message %d from %d %x %x %x %d",
4060 le32_to_cpu(ms->m_type), from, lkb->lkb_id,
4061 lkb->lkb_remid, lkb->lkb_flags, lkb->lkb_nodeid);
4062 return error;
4063 }
4064
send_repeat_remove(struct dlm_ls * ls,char * ms_name,int len)4065 static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
4066 {
4067 char name[DLM_RESNAME_MAXLEN + 1];
4068 struct dlm_message *ms;
4069 struct dlm_mhandle *mh;
4070 struct dlm_rsb *r;
4071 uint32_t hash, b;
4072 int rv, dir_nodeid;
4073
4074 memset(name, 0, sizeof(name));
4075 memcpy(name, ms_name, len);
4076
4077 hash = jhash(name, len, 0);
4078 b = hash & (ls->ls_rsbtbl_size - 1);
4079
4080 dir_nodeid = dlm_hash2nodeid(ls, hash);
4081
4082 log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
4083
4084 spin_lock(&ls->ls_rsbtbl[b].lock);
4085 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4086 if (!rv) {
4087 spin_unlock(&ls->ls_rsbtbl[b].lock);
4088 log_error(ls, "repeat_remove on keep %s", name);
4089 return;
4090 }
4091
4092 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4093 if (!rv) {
4094 spin_unlock(&ls->ls_rsbtbl[b].lock);
4095 log_error(ls, "repeat_remove on toss %s", name);
4096 return;
4097 }
4098
4099 /* use ls->remove_name2 to avoid conflict with shrink? */
4100
4101 spin_lock(&ls->ls_remove_spin);
4102 ls->ls_remove_len = len;
4103 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
4104 spin_unlock(&ls->ls_remove_spin);
4105 spin_unlock(&ls->ls_rsbtbl[b].lock);
4106
4107 rv = _create_message(ls, sizeof(struct dlm_message) + len,
4108 dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
4109 if (rv)
4110 goto out;
4111
4112 memcpy(ms->m_extra, name, len);
4113 ms->m_hash = cpu_to_le32(hash);
4114
4115 send_message(mh, ms);
4116
4117 out:
4118 spin_lock(&ls->ls_remove_spin);
4119 ls->ls_remove_len = 0;
4120 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
4121 spin_unlock(&ls->ls_remove_spin);
4122 wake_up(&ls->ls_remove_wait);
4123 }
4124
receive_request(struct dlm_ls * ls,struct dlm_message * ms)4125 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
4126 {
4127 struct dlm_lkb *lkb;
4128 struct dlm_rsb *r;
4129 int from_nodeid;
4130 int error, namelen = 0;
4131
4132 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4133
4134 error = create_lkb(ls, &lkb);
4135 if (error)
4136 goto fail;
4137
4138 receive_flags(lkb, ms);
4139 lkb->lkb_flags |= DLM_IFL_MSTCPY;
4140 error = receive_request_args(ls, lkb, ms);
4141 if (error) {
4142 __put_lkb(ls, lkb);
4143 goto fail;
4144 }
4145
4146 /* The dir node is the authority on whether we are the master
4147 for this rsb or not, so if the master sends us a request, we should
4148 recreate the rsb if we've destroyed it. This race happens when we
4149 send a remove message to the dir node at the same time that the dir
4150 node sends us a request for the rsb. */
4151
4152 namelen = receive_extralen(ms);
4153
4154 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4155 R_RECEIVE_REQUEST, &r);
4156 if (error) {
4157 __put_lkb(ls, lkb);
4158 goto fail;
4159 }
4160
4161 lock_rsb(r);
4162
4163 if (r->res_master_nodeid != dlm_our_nodeid()) {
4164 error = validate_master_nodeid(ls, r, from_nodeid);
4165 if (error) {
4166 unlock_rsb(r);
4167 put_rsb(r);
4168 __put_lkb(ls, lkb);
4169 goto fail;
4170 }
4171 }
4172
4173 attach_lkb(r, lkb);
4174 error = do_request(r, lkb);
4175 send_request_reply(r, lkb, error);
4176 do_request_effects(r, lkb, error);
4177
4178 unlock_rsb(r);
4179 put_rsb(r);
4180
4181 if (error == -EINPROGRESS)
4182 error = 0;
4183 if (error)
4184 dlm_put_lkb(lkb);
4185 return 0;
4186
4187 fail:
4188 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4189 and do this receive_request again from process_lookup_list once
4190 we get the lookup reply. This would avoid a many repeated
4191 ENOTBLK request failures when the lookup reply designating us
4192 as master is delayed. */
4193
4194 /* We could repeatedly return -EBADR here if our send_remove() is
4195 delayed in being sent/arriving/being processed on the dir node.
4196 Another node would repeatedly lookup up the master, and the dir
4197 node would continue returning our nodeid until our send_remove
4198 took effect.
4199
4200 We send another remove message in case our previous send_remove
4201 was lost/ignored/missed somehow. */
4202
4203 if (error != -ENOTBLK) {
4204 log_limit(ls, "receive_request %x from %d %d",
4205 le32_to_cpu(ms->m_lkid), from_nodeid, error);
4206 }
4207
4208 if (namelen && error == -EBADR) {
4209 send_repeat_remove(ls, ms->m_extra, namelen);
4210 msleep(1000);
4211 }
4212
4213 setup_stub_lkb(ls, ms);
4214 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4215 return error;
4216 }
4217
receive_convert(struct dlm_ls * ls,struct dlm_message * ms)4218 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
4219 {
4220 struct dlm_lkb *lkb;
4221 struct dlm_rsb *r;
4222 int error, reply = 1;
4223
4224 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4225 if (error)
4226 goto fail;
4227
4228 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4229 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4230 "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4231 (unsigned long long)lkb->lkb_recover_seq,
4232 le32_to_cpu(ms->m_header.h_nodeid),
4233 le32_to_cpu(ms->m_lkid));
4234 error = -ENOENT;
4235 dlm_put_lkb(lkb);
4236 goto fail;
4237 }
4238
4239 r = lkb->lkb_resource;
4240
4241 hold_rsb(r);
4242 lock_rsb(r);
4243
4244 error = validate_message(lkb, ms);
4245 if (error)
4246 goto out;
4247
4248 receive_flags(lkb, ms);
4249
4250 error = receive_convert_args(ls, lkb, ms);
4251 if (error) {
4252 send_convert_reply(r, lkb, error);
4253 goto out;
4254 }
4255
4256 reply = !down_conversion(lkb);
4257
4258 error = do_convert(r, lkb);
4259 if (reply)
4260 send_convert_reply(r, lkb, error);
4261 do_convert_effects(r, lkb, error);
4262 out:
4263 unlock_rsb(r);
4264 put_rsb(r);
4265 dlm_put_lkb(lkb);
4266 return 0;
4267
4268 fail:
4269 setup_stub_lkb(ls, ms);
4270 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4271 return error;
4272 }
4273
receive_unlock(struct dlm_ls * ls,struct dlm_message * ms)4274 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
4275 {
4276 struct dlm_lkb *lkb;
4277 struct dlm_rsb *r;
4278 int error;
4279
4280 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4281 if (error)
4282 goto fail;
4283
4284 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4285 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4286 lkb->lkb_id, lkb->lkb_remid,
4287 le32_to_cpu(ms->m_header.h_nodeid),
4288 le32_to_cpu(ms->m_lkid));
4289 error = -ENOENT;
4290 dlm_put_lkb(lkb);
4291 goto fail;
4292 }
4293
4294 r = lkb->lkb_resource;
4295
4296 hold_rsb(r);
4297 lock_rsb(r);
4298
4299 error = validate_message(lkb, ms);
4300 if (error)
4301 goto out;
4302
4303 receive_flags(lkb, ms);
4304
4305 error = receive_unlock_args(ls, lkb, ms);
4306 if (error) {
4307 send_unlock_reply(r, lkb, error);
4308 goto out;
4309 }
4310
4311 error = do_unlock(r, lkb);
4312 send_unlock_reply(r, lkb, error);
4313 do_unlock_effects(r, lkb, error);
4314 out:
4315 unlock_rsb(r);
4316 put_rsb(r);
4317 dlm_put_lkb(lkb);
4318 return 0;
4319
4320 fail:
4321 setup_stub_lkb(ls, ms);
4322 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4323 return error;
4324 }
4325
receive_cancel(struct dlm_ls * ls,struct dlm_message * ms)4326 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4327 {
4328 struct dlm_lkb *lkb;
4329 struct dlm_rsb *r;
4330 int error;
4331
4332 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4333 if (error)
4334 goto fail;
4335
4336 receive_flags(lkb, ms);
4337
4338 r = lkb->lkb_resource;
4339
4340 hold_rsb(r);
4341 lock_rsb(r);
4342
4343 error = validate_message(lkb, ms);
4344 if (error)
4345 goto out;
4346
4347 error = do_cancel(r, lkb);
4348 send_cancel_reply(r, lkb, error);
4349 do_cancel_effects(r, lkb, error);
4350 out:
4351 unlock_rsb(r);
4352 put_rsb(r);
4353 dlm_put_lkb(lkb);
4354 return 0;
4355
4356 fail:
4357 setup_stub_lkb(ls, ms);
4358 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4359 return error;
4360 }
4361
receive_grant(struct dlm_ls * ls,struct dlm_message * ms)4362 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4363 {
4364 struct dlm_lkb *lkb;
4365 struct dlm_rsb *r;
4366 int error;
4367
4368 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4369 if (error)
4370 return error;
4371
4372 r = lkb->lkb_resource;
4373
4374 hold_rsb(r);
4375 lock_rsb(r);
4376
4377 error = validate_message(lkb, ms);
4378 if (error)
4379 goto out;
4380
4381 receive_flags_reply(lkb, ms);
4382 if (is_altmode(lkb))
4383 munge_altmode(lkb, ms);
4384 grant_lock_pc(r, lkb, ms);
4385 queue_cast(r, lkb, 0);
4386 out:
4387 unlock_rsb(r);
4388 put_rsb(r);
4389 dlm_put_lkb(lkb);
4390 return 0;
4391 }
4392
receive_bast(struct dlm_ls * ls,struct dlm_message * ms)4393 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4394 {
4395 struct dlm_lkb *lkb;
4396 struct dlm_rsb *r;
4397 int error;
4398
4399 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4400 if (error)
4401 return error;
4402
4403 r = lkb->lkb_resource;
4404
4405 hold_rsb(r);
4406 lock_rsb(r);
4407
4408 error = validate_message(lkb, ms);
4409 if (error)
4410 goto out;
4411
4412 queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4413 lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4414 out:
4415 unlock_rsb(r);
4416 put_rsb(r);
4417 dlm_put_lkb(lkb);
4418 return 0;
4419 }
4420
receive_lookup(struct dlm_ls * ls,struct dlm_message * ms)4421 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4422 {
4423 int len, error, ret_nodeid, from_nodeid, our_nodeid;
4424
4425 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4426 our_nodeid = dlm_our_nodeid();
4427
4428 len = receive_extralen(ms);
4429
4430 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4431 &ret_nodeid, NULL);
4432
4433 /* Optimization: we're master so treat lookup as a request */
4434 if (!error && ret_nodeid == our_nodeid) {
4435 receive_request(ls, ms);
4436 return;
4437 }
4438 send_lookup_reply(ls, ms, ret_nodeid, error);
4439 }
4440
receive_remove(struct dlm_ls * ls,struct dlm_message * ms)4441 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4442 {
4443 char name[DLM_RESNAME_MAXLEN+1];
4444 struct dlm_rsb *r;
4445 uint32_t hash, b;
4446 int rv, len, dir_nodeid, from_nodeid;
4447
4448 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4449
4450 len = receive_extralen(ms);
4451
4452 if (len > DLM_RESNAME_MAXLEN) {
4453 log_error(ls, "receive_remove from %d bad len %d",
4454 from_nodeid, len);
4455 return;
4456 }
4457
4458 dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4459 if (dir_nodeid != dlm_our_nodeid()) {
4460 log_error(ls, "receive_remove from %d bad nodeid %d",
4461 from_nodeid, dir_nodeid);
4462 return;
4463 }
4464
4465 /* Look for name on rsbtbl.toss, if it's there, kill it.
4466 If it's on rsbtbl.keep, it's being used, and we should ignore this
4467 message. This is an expected race between the dir node sending a
4468 request to the master node at the same time as the master node sends
4469 a remove to the dir node. The resolution to that race is for the
4470 dir node to ignore the remove message, and the master node to
4471 recreate the master rsb when it gets a request from the dir node for
4472 an rsb it doesn't have. */
4473
4474 memset(name, 0, sizeof(name));
4475 memcpy(name, ms->m_extra, len);
4476
4477 hash = jhash(name, len, 0);
4478 b = hash & (ls->ls_rsbtbl_size - 1);
4479
4480 spin_lock(&ls->ls_rsbtbl[b].lock);
4481
4482 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4483 if (rv) {
4484 /* verify the rsb is on keep list per comment above */
4485 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4486 if (rv) {
4487 /* should not happen */
4488 log_error(ls, "receive_remove from %d not found %s",
4489 from_nodeid, name);
4490 spin_unlock(&ls->ls_rsbtbl[b].lock);
4491 return;
4492 }
4493 if (r->res_master_nodeid != from_nodeid) {
4494 /* should not happen */
4495 log_error(ls, "receive_remove keep from %d master %d",
4496 from_nodeid, r->res_master_nodeid);
4497 dlm_print_rsb(r);
4498 spin_unlock(&ls->ls_rsbtbl[b].lock);
4499 return;
4500 }
4501
4502 log_debug(ls, "receive_remove from %d master %d first %x %s",
4503 from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4504 name);
4505 spin_unlock(&ls->ls_rsbtbl[b].lock);
4506 return;
4507 }
4508
4509 if (r->res_master_nodeid != from_nodeid) {
4510 log_error(ls, "receive_remove toss from %d master %d",
4511 from_nodeid, r->res_master_nodeid);
4512 dlm_print_rsb(r);
4513 spin_unlock(&ls->ls_rsbtbl[b].lock);
4514 return;
4515 }
4516
4517 if (kref_put(&r->res_ref, kill_rsb)) {
4518 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4519 spin_unlock(&ls->ls_rsbtbl[b].lock);
4520 dlm_free_rsb(r);
4521 } else {
4522 log_error(ls, "receive_remove from %d rsb ref error",
4523 from_nodeid);
4524 dlm_print_rsb(r);
4525 spin_unlock(&ls->ls_rsbtbl[b].lock);
4526 }
4527 }
4528
receive_purge(struct dlm_ls * ls,struct dlm_message * ms)4529 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4530 {
4531 do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4532 }
4533
receive_request_reply(struct dlm_ls * ls,struct dlm_message * ms)4534 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4535 {
4536 struct dlm_lkb *lkb;
4537 struct dlm_rsb *r;
4538 int error, mstype, result;
4539 int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4540
4541 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4542 if (error)
4543 return error;
4544
4545 r = lkb->lkb_resource;
4546 hold_rsb(r);
4547 lock_rsb(r);
4548
4549 error = validate_message(lkb, ms);
4550 if (error)
4551 goto out;
4552
4553 mstype = lkb->lkb_wait_type;
4554 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4555 if (error) {
4556 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4557 lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4558 from_dlm_errno(le32_to_cpu(ms->m_result)));
4559 dlm_dump_rsb(r);
4560 goto out;
4561 }
4562
4563 /* Optimization: the dir node was also the master, so it took our
4564 lookup as a request and sent request reply instead of lookup reply */
4565 if (mstype == DLM_MSG_LOOKUP) {
4566 r->res_master_nodeid = from_nodeid;
4567 r->res_nodeid = from_nodeid;
4568 lkb->lkb_nodeid = from_nodeid;
4569 }
4570
4571 /* this is the value returned from do_request() on the master */
4572 result = from_dlm_errno(le32_to_cpu(ms->m_result));
4573
4574 switch (result) {
4575 case -EAGAIN:
4576 /* request would block (be queued) on remote master */
4577 queue_cast(r, lkb, -EAGAIN);
4578 confirm_master(r, -EAGAIN);
4579 unhold_lkb(lkb); /* undoes create_lkb() */
4580 break;
4581
4582 case -EINPROGRESS:
4583 case 0:
4584 /* request was queued or granted on remote master */
4585 receive_flags_reply(lkb, ms);
4586 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4587 if (is_altmode(lkb))
4588 munge_altmode(lkb, ms);
4589 if (result) {
4590 add_lkb(r, lkb, DLM_LKSTS_WAITING);
4591 add_timeout(lkb);
4592 } else {
4593 grant_lock_pc(r, lkb, ms);
4594 queue_cast(r, lkb, 0);
4595 }
4596 confirm_master(r, result);
4597 break;
4598
4599 case -EBADR:
4600 case -ENOTBLK:
4601 /* find_rsb failed to find rsb or rsb wasn't master */
4602 log_limit(ls, "receive_request_reply %x from %d %d "
4603 "master %d dir %d first %x %s", lkb->lkb_id,
4604 from_nodeid, result, r->res_master_nodeid,
4605 r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4606
4607 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4608 r->res_master_nodeid != dlm_our_nodeid()) {
4609 /* cause _request_lock->set_master->send_lookup */
4610 r->res_master_nodeid = 0;
4611 r->res_nodeid = -1;
4612 lkb->lkb_nodeid = -1;
4613 }
4614
4615 if (is_overlap(lkb)) {
4616 /* we'll ignore error in cancel/unlock reply */
4617 queue_cast_overlap(r, lkb);
4618 confirm_master(r, result);
4619 unhold_lkb(lkb); /* undoes create_lkb() */
4620 } else {
4621 _request_lock(r, lkb);
4622
4623 if (r->res_master_nodeid == dlm_our_nodeid())
4624 confirm_master(r, 0);
4625 }
4626 break;
4627
4628 default:
4629 log_error(ls, "receive_request_reply %x error %d",
4630 lkb->lkb_id, result);
4631 }
4632
4633 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4634 log_debug(ls, "receive_request_reply %x result %d unlock",
4635 lkb->lkb_id, result);
4636 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4637 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4638 send_unlock(r, lkb);
4639 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4640 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4641 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4642 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4643 send_cancel(r, lkb);
4644 } else {
4645 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4646 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4647 }
4648 out:
4649 unlock_rsb(r);
4650 put_rsb(r);
4651 dlm_put_lkb(lkb);
4652 return 0;
4653 }
4654
__receive_convert_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,struct dlm_message * ms)4655 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4656 struct dlm_message *ms)
4657 {
4658 /* this is the value returned from do_convert() on the master */
4659 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4660 case -EAGAIN:
4661 /* convert would block (be queued) on remote master */
4662 queue_cast(r, lkb, -EAGAIN);
4663 break;
4664
4665 case -EDEADLK:
4666 receive_flags_reply(lkb, ms);
4667 revert_lock_pc(r, lkb);
4668 queue_cast(r, lkb, -EDEADLK);
4669 break;
4670
4671 case -EINPROGRESS:
4672 /* convert was queued on remote master */
4673 receive_flags_reply(lkb, ms);
4674 if (is_demoted(lkb))
4675 munge_demoted(lkb);
4676 del_lkb(r, lkb);
4677 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4678 add_timeout(lkb);
4679 break;
4680
4681 case 0:
4682 /* convert was granted on remote master */
4683 receive_flags_reply(lkb, ms);
4684 if (is_demoted(lkb))
4685 munge_demoted(lkb);
4686 grant_lock_pc(r, lkb, ms);
4687 queue_cast(r, lkb, 0);
4688 break;
4689
4690 default:
4691 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4692 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4693 le32_to_cpu(ms->m_lkid),
4694 from_dlm_errno(le32_to_cpu(ms->m_result)));
4695 dlm_print_rsb(r);
4696 dlm_print_lkb(lkb);
4697 }
4698 }
4699
_receive_convert_reply(struct dlm_lkb * lkb,struct dlm_message * ms)4700 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4701 {
4702 struct dlm_rsb *r = lkb->lkb_resource;
4703 int error;
4704
4705 hold_rsb(r);
4706 lock_rsb(r);
4707
4708 error = validate_message(lkb, ms);
4709 if (error)
4710 goto out;
4711
4712 /* stub reply can happen with waiters_mutex held */
4713 error = remove_from_waiters_ms(lkb, ms);
4714 if (error)
4715 goto out;
4716
4717 __receive_convert_reply(r, lkb, ms);
4718 out:
4719 unlock_rsb(r);
4720 put_rsb(r);
4721 }
4722
receive_convert_reply(struct dlm_ls * ls,struct dlm_message * ms)4723 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4724 {
4725 struct dlm_lkb *lkb;
4726 int error;
4727
4728 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4729 if (error)
4730 return error;
4731
4732 _receive_convert_reply(lkb, ms);
4733 dlm_put_lkb(lkb);
4734 return 0;
4735 }
4736
_receive_unlock_reply(struct dlm_lkb * lkb,struct dlm_message * ms)4737 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4738 {
4739 struct dlm_rsb *r = lkb->lkb_resource;
4740 int error;
4741
4742 hold_rsb(r);
4743 lock_rsb(r);
4744
4745 error = validate_message(lkb, ms);
4746 if (error)
4747 goto out;
4748
4749 /* stub reply can happen with waiters_mutex held */
4750 error = remove_from_waiters_ms(lkb, ms);
4751 if (error)
4752 goto out;
4753
4754 /* this is the value returned from do_unlock() on the master */
4755
4756 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4757 case -DLM_EUNLOCK:
4758 receive_flags_reply(lkb, ms);
4759 remove_lock_pc(r, lkb);
4760 queue_cast(r, lkb, -DLM_EUNLOCK);
4761 break;
4762 case -ENOENT:
4763 break;
4764 default:
4765 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4766 lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4767 }
4768 out:
4769 unlock_rsb(r);
4770 put_rsb(r);
4771 }
4772
receive_unlock_reply(struct dlm_ls * ls,struct dlm_message * ms)4773 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4774 {
4775 struct dlm_lkb *lkb;
4776 int error;
4777
4778 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4779 if (error)
4780 return error;
4781
4782 _receive_unlock_reply(lkb, ms);
4783 dlm_put_lkb(lkb);
4784 return 0;
4785 }
4786
_receive_cancel_reply(struct dlm_lkb * lkb,struct dlm_message * ms)4787 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4788 {
4789 struct dlm_rsb *r = lkb->lkb_resource;
4790 int error;
4791
4792 hold_rsb(r);
4793 lock_rsb(r);
4794
4795 error = validate_message(lkb, ms);
4796 if (error)
4797 goto out;
4798
4799 /* stub reply can happen with waiters_mutex held */
4800 error = remove_from_waiters_ms(lkb, ms);
4801 if (error)
4802 goto out;
4803
4804 /* this is the value returned from do_cancel() on the master */
4805
4806 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4807 case -DLM_ECANCEL:
4808 receive_flags_reply(lkb, ms);
4809 revert_lock_pc(r, lkb);
4810 queue_cast(r, lkb, -DLM_ECANCEL);
4811 break;
4812 case 0:
4813 break;
4814 default:
4815 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4816 lkb->lkb_id,
4817 from_dlm_errno(le32_to_cpu(ms->m_result)));
4818 }
4819 out:
4820 unlock_rsb(r);
4821 put_rsb(r);
4822 }
4823
receive_cancel_reply(struct dlm_ls * ls,struct dlm_message * ms)4824 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4825 {
4826 struct dlm_lkb *lkb;
4827 int error;
4828
4829 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4830 if (error)
4831 return error;
4832
4833 _receive_cancel_reply(lkb, ms);
4834 dlm_put_lkb(lkb);
4835 return 0;
4836 }
4837
receive_lookup_reply(struct dlm_ls * ls,struct dlm_message * ms)4838 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4839 {
4840 struct dlm_lkb *lkb;
4841 struct dlm_rsb *r;
4842 int error, ret_nodeid;
4843 int do_lookup_list = 0;
4844
4845 error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4846 if (error) {
4847 log_error(ls, "%s no lkid %x", __func__,
4848 le32_to_cpu(ms->m_lkid));
4849 return;
4850 }
4851
4852 /* ms->m_result is the value returned by dlm_master_lookup on dir node
4853 FIXME: will a non-zero error ever be returned? */
4854
4855 r = lkb->lkb_resource;
4856 hold_rsb(r);
4857 lock_rsb(r);
4858
4859 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4860 if (error)
4861 goto out;
4862
4863 ret_nodeid = le32_to_cpu(ms->m_nodeid);
4864
4865 /* We sometimes receive a request from the dir node for this
4866 rsb before we've received the dir node's loookup_reply for it.
4867 The request from the dir node implies we're the master, so we set
4868 ourself as master in receive_request_reply, and verify here that
4869 we are indeed the master. */
4870
4871 if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4872 /* This should never happen */
4873 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4874 "master %d dir %d our %d first %x %s",
4875 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4876 ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4877 dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4878 }
4879
4880 if (ret_nodeid == dlm_our_nodeid()) {
4881 r->res_master_nodeid = ret_nodeid;
4882 r->res_nodeid = 0;
4883 do_lookup_list = 1;
4884 r->res_first_lkid = 0;
4885 } else if (ret_nodeid == -1) {
4886 /* the remote node doesn't believe it's the dir node */
4887 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4888 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4889 r->res_master_nodeid = 0;
4890 r->res_nodeid = -1;
4891 lkb->lkb_nodeid = -1;
4892 } else {
4893 /* set_master() will set lkb_nodeid from r */
4894 r->res_master_nodeid = ret_nodeid;
4895 r->res_nodeid = ret_nodeid;
4896 }
4897
4898 if (is_overlap(lkb)) {
4899 log_debug(ls, "receive_lookup_reply %x unlock %x",
4900 lkb->lkb_id, lkb->lkb_flags);
4901 queue_cast_overlap(r, lkb);
4902 unhold_lkb(lkb); /* undoes create_lkb() */
4903 goto out_list;
4904 }
4905
4906 _request_lock(r, lkb);
4907
4908 out_list:
4909 if (do_lookup_list)
4910 process_lookup_list(r);
4911 out:
4912 unlock_rsb(r);
4913 put_rsb(r);
4914 dlm_put_lkb(lkb);
4915 }
4916
_receive_message(struct dlm_ls * ls,struct dlm_message * ms,uint32_t saved_seq)4917 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4918 uint32_t saved_seq)
4919 {
4920 int error = 0, noent = 0;
4921
4922 if (!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid))) {
4923 log_limit(ls, "receive %d from non-member %d %x %x %d",
4924 le32_to_cpu(ms->m_type),
4925 le32_to_cpu(ms->m_header.h_nodeid),
4926 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4927 from_dlm_errno(le32_to_cpu(ms->m_result)));
4928 return;
4929 }
4930
4931 switch (ms->m_type) {
4932
4933 /* messages sent to a master node */
4934
4935 case cpu_to_le32(DLM_MSG_REQUEST):
4936 error = receive_request(ls, ms);
4937 break;
4938
4939 case cpu_to_le32(DLM_MSG_CONVERT):
4940 error = receive_convert(ls, ms);
4941 break;
4942
4943 case cpu_to_le32(DLM_MSG_UNLOCK):
4944 error = receive_unlock(ls, ms);
4945 break;
4946
4947 case cpu_to_le32(DLM_MSG_CANCEL):
4948 noent = 1;
4949 error = receive_cancel(ls, ms);
4950 break;
4951
4952 /* messages sent from a master node (replies to above) */
4953
4954 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4955 error = receive_request_reply(ls, ms);
4956 break;
4957
4958 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4959 error = receive_convert_reply(ls, ms);
4960 break;
4961
4962 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4963 error = receive_unlock_reply(ls, ms);
4964 break;
4965
4966 case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4967 error = receive_cancel_reply(ls, ms);
4968 break;
4969
4970 /* messages sent from a master node (only two types of async msg) */
4971
4972 case cpu_to_le32(DLM_MSG_GRANT):
4973 noent = 1;
4974 error = receive_grant(ls, ms);
4975 break;
4976
4977 case cpu_to_le32(DLM_MSG_BAST):
4978 noent = 1;
4979 error = receive_bast(ls, ms);
4980 break;
4981
4982 /* messages sent to a dir node */
4983
4984 case cpu_to_le32(DLM_MSG_LOOKUP):
4985 receive_lookup(ls, ms);
4986 break;
4987
4988 case cpu_to_le32(DLM_MSG_REMOVE):
4989 receive_remove(ls, ms);
4990 break;
4991
4992 /* messages sent from a dir node (remove has no reply) */
4993
4994 case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4995 receive_lookup_reply(ls, ms);
4996 break;
4997
4998 /* other messages */
4999
5000 case cpu_to_le32(DLM_MSG_PURGE):
5001 receive_purge(ls, ms);
5002 break;
5003
5004 default:
5005 log_error(ls, "unknown message type %d",
5006 le32_to_cpu(ms->m_type));
5007 }
5008
5009 /*
5010 * When checking for ENOENT, we're checking the result of
5011 * find_lkb(m_remid):
5012 *
5013 * The lock id referenced in the message wasn't found. This may
5014 * happen in normal usage for the async messages and cancel, so
5015 * only use log_debug for them.
5016 *
5017 * Some errors are expected and normal.
5018 */
5019
5020 if (error == -ENOENT && noent) {
5021 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
5022 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
5023 le32_to_cpu(ms->m_header.h_nodeid),
5024 le32_to_cpu(ms->m_lkid), saved_seq);
5025 } else if (error == -ENOENT) {
5026 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
5027 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
5028 le32_to_cpu(ms->m_header.h_nodeid),
5029 le32_to_cpu(ms->m_lkid), saved_seq);
5030
5031 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
5032 dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
5033 }
5034
5035 if (error == -EINVAL) {
5036 log_error(ls, "receive %d inval from %d lkid %x remid %x "
5037 "saved_seq %u",
5038 le32_to_cpu(ms->m_type),
5039 le32_to_cpu(ms->m_header.h_nodeid),
5040 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
5041 saved_seq);
5042 }
5043 }
5044
5045 /* If the lockspace is in recovery mode (locking stopped), then normal
5046 messages are saved on the requestqueue for processing after recovery is
5047 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
5048 messages off the requestqueue before we process new ones. This occurs right
5049 after recovery completes when we transition from saving all messages on
5050 requestqueue, to processing all the saved messages, to processing new
5051 messages as they arrive. */
5052
dlm_receive_message(struct dlm_ls * ls,struct dlm_message * ms,int nodeid)5053 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
5054 int nodeid)
5055 {
5056 if (dlm_locking_stopped(ls)) {
5057 /* If we were a member of this lockspace, left, and rejoined,
5058 other nodes may still be sending us messages from the
5059 lockspace generation before we left. */
5060 if (!ls->ls_generation) {
5061 log_limit(ls, "receive %d from %d ignore old gen",
5062 le32_to_cpu(ms->m_type), nodeid);
5063 return;
5064 }
5065
5066 dlm_add_requestqueue(ls, nodeid, ms);
5067 } else {
5068 dlm_wait_requestqueue(ls);
5069 _receive_message(ls, ms, 0);
5070 }
5071 }
5072
5073 /* This is called by dlm_recoverd to process messages that were saved on
5074 the requestqueue. */
5075
dlm_receive_message_saved(struct dlm_ls * ls,struct dlm_message * ms,uint32_t saved_seq)5076 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
5077 uint32_t saved_seq)
5078 {
5079 _receive_message(ls, ms, saved_seq);
5080 }
5081
5082 /* This is called by the midcomms layer when something is received for
5083 the lockspace. It could be either a MSG (normal message sent as part of
5084 standard locking activity) or an RCOM (recovery message sent as part of
5085 lockspace recovery). */
5086
dlm_receive_buffer(union dlm_packet * p,int nodeid)5087 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
5088 {
5089 struct dlm_header *hd = &p->header;
5090 struct dlm_ls *ls;
5091 int type = 0;
5092
5093 switch (hd->h_cmd) {
5094 case DLM_MSG:
5095 type = le32_to_cpu(p->message.m_type);
5096 break;
5097 case DLM_RCOM:
5098 type = le32_to_cpu(p->rcom.rc_type);
5099 break;
5100 default:
5101 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
5102 return;
5103 }
5104
5105 if (le32_to_cpu(hd->h_nodeid) != nodeid) {
5106 log_print("invalid h_nodeid %d from %d lockspace %x",
5107 le32_to_cpu(hd->h_nodeid), nodeid,
5108 le32_to_cpu(hd->u.h_lockspace));
5109 return;
5110 }
5111
5112 ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
5113 if (!ls) {
5114 if (dlm_config.ci_log_debug) {
5115 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
5116 "%u from %d cmd %d type %d\n",
5117 le32_to_cpu(hd->u.h_lockspace), nodeid,
5118 hd->h_cmd, type);
5119 }
5120
5121 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
5122 dlm_send_ls_not_ready(nodeid, &p->rcom);
5123 return;
5124 }
5125
5126 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
5127 be inactive (in this ls) before transitioning to recovery mode */
5128
5129 down_read(&ls->ls_recv_active);
5130 if (hd->h_cmd == DLM_MSG)
5131 dlm_receive_message(ls, &p->message, nodeid);
5132 else
5133 dlm_receive_rcom(ls, &p->rcom, nodeid);
5134 up_read(&ls->ls_recv_active);
5135
5136 dlm_put_lockspace(ls);
5137 }
5138
recover_convert_waiter(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_message * ms_stub)5139 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5140 struct dlm_message *ms_stub)
5141 {
5142 if (middle_conversion(lkb)) {
5143 hold_lkb(lkb);
5144 memset(ms_stub, 0, sizeof(struct dlm_message));
5145 ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
5146 ms_stub->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5147 ms_stub->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5148 ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5149 _receive_convert_reply(lkb, ms_stub);
5150
5151 /* Same special case as in receive_rcom_lock_args() */
5152 lkb->lkb_grmode = DLM_LOCK_IV;
5153 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5154 unhold_lkb(lkb);
5155
5156 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5157 lkb->lkb_flags |= DLM_IFL_RESEND;
5158 }
5159
5160 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5161 conversions are async; there's no reply from the remote master */
5162 }
5163
5164 /* A waiting lkb needs recovery if the master node has failed, or
5165 the master node is changing (only when no directory is used) */
5166
waiter_needs_recovery(struct dlm_ls * ls,struct dlm_lkb * lkb,int dir_nodeid)5167 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5168 int dir_nodeid)
5169 {
5170 if (dlm_no_directory(ls))
5171 return 1;
5172
5173 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5174 return 1;
5175
5176 return 0;
5177 }
5178
5179 /* Recovery for locks that are waiting for replies from nodes that are now
5180 gone. We can just complete unlocks and cancels by faking a reply from the
5181 dead node. Requests and up-conversions we flag to be resent after
5182 recovery. Down-conversions can just be completed with a fake reply like
5183 unlocks. Conversions between PR and CW need special attention. */
5184
dlm_recover_waiters_pre(struct dlm_ls * ls)5185 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5186 {
5187 struct dlm_lkb *lkb, *safe;
5188 struct dlm_message *ms_stub;
5189 int wait_type, stub_unlock_result, stub_cancel_result;
5190 int dir_nodeid;
5191
5192 ms_stub = kmalloc(sizeof(*ms_stub), GFP_KERNEL);
5193 if (!ms_stub)
5194 return;
5195
5196 mutex_lock(&ls->ls_waiters_mutex);
5197
5198 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5199
5200 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5201
5202 /* exclude debug messages about unlocks because there can be so
5203 many and they aren't very interesting */
5204
5205 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5206 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5207 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5208 lkb->lkb_id,
5209 lkb->lkb_remid,
5210 lkb->lkb_wait_type,
5211 lkb->lkb_resource->res_nodeid,
5212 lkb->lkb_nodeid,
5213 lkb->lkb_wait_nodeid,
5214 dir_nodeid);
5215 }
5216
5217 /* all outstanding lookups, regardless of destination will be
5218 resent after recovery is done */
5219
5220 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5221 lkb->lkb_flags |= DLM_IFL_RESEND;
5222 continue;
5223 }
5224
5225 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5226 continue;
5227
5228 wait_type = lkb->lkb_wait_type;
5229 stub_unlock_result = -DLM_EUNLOCK;
5230 stub_cancel_result = -DLM_ECANCEL;
5231
5232 /* Main reply may have been received leaving a zero wait_type,
5233 but a reply for the overlapping op may not have been
5234 received. In that case we need to fake the appropriate
5235 reply for the overlap op. */
5236
5237 if (!wait_type) {
5238 if (is_overlap_cancel(lkb)) {
5239 wait_type = DLM_MSG_CANCEL;
5240 if (lkb->lkb_grmode == DLM_LOCK_IV)
5241 stub_cancel_result = 0;
5242 }
5243 if (is_overlap_unlock(lkb)) {
5244 wait_type = DLM_MSG_UNLOCK;
5245 if (lkb->lkb_grmode == DLM_LOCK_IV)
5246 stub_unlock_result = -ENOENT;
5247 }
5248
5249 log_debug(ls, "rwpre overlap %x %x %d %d %d",
5250 lkb->lkb_id, lkb->lkb_flags, wait_type,
5251 stub_cancel_result, stub_unlock_result);
5252 }
5253
5254 switch (wait_type) {
5255
5256 case DLM_MSG_REQUEST:
5257 lkb->lkb_flags |= DLM_IFL_RESEND;
5258 break;
5259
5260 case DLM_MSG_CONVERT:
5261 recover_convert_waiter(ls, lkb, ms_stub);
5262 break;
5263
5264 case DLM_MSG_UNLOCK:
5265 hold_lkb(lkb);
5266 memset(ms_stub, 0, sizeof(struct dlm_message));
5267 ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
5268 ms_stub->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5269 ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_unlock_result));
5270 ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5271 _receive_unlock_reply(lkb, ms_stub);
5272 dlm_put_lkb(lkb);
5273 break;
5274
5275 case DLM_MSG_CANCEL:
5276 hold_lkb(lkb);
5277 memset(ms_stub, 0, sizeof(struct dlm_message));
5278 ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
5279 ms_stub->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5280 ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_cancel_result));
5281 ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5282 _receive_cancel_reply(lkb, ms_stub);
5283 dlm_put_lkb(lkb);
5284 break;
5285
5286 default:
5287 log_error(ls, "invalid lkb wait_type %d %d",
5288 lkb->lkb_wait_type, wait_type);
5289 }
5290 schedule();
5291 }
5292 mutex_unlock(&ls->ls_waiters_mutex);
5293 kfree(ms_stub);
5294 }
5295
find_resend_waiter(struct dlm_ls * ls)5296 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5297 {
5298 struct dlm_lkb *lkb = NULL, *iter;
5299
5300 mutex_lock(&ls->ls_waiters_mutex);
5301 list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5302 if (iter->lkb_flags & DLM_IFL_RESEND) {
5303 hold_lkb(iter);
5304 lkb = iter;
5305 break;
5306 }
5307 }
5308 mutex_unlock(&ls->ls_waiters_mutex);
5309
5310 return lkb;
5311 }
5312
5313 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
5314 master or dir-node for r. Processing the lkb may result in it being placed
5315 back on waiters. */
5316
5317 /* We do this after normal locking has been enabled and any saved messages
5318 (in requestqueue) have been processed. We should be confident that at
5319 this point we won't get or process a reply to any of these waiting
5320 operations. But, new ops may be coming in on the rsbs/locks here from
5321 userspace or remotely. */
5322
5323 /* there may have been an overlap unlock/cancel prior to recovery or after
5324 recovery. if before, the lkb may still have a pos wait_count; if after, the
5325 overlap flag would just have been set and nothing new sent. we can be
5326 confident here than any replies to either the initial op or overlap ops
5327 prior to recovery have been received. */
5328
dlm_recover_waiters_post(struct dlm_ls * ls)5329 int dlm_recover_waiters_post(struct dlm_ls *ls)
5330 {
5331 struct dlm_lkb *lkb;
5332 struct dlm_rsb *r;
5333 int error = 0, mstype, err, oc, ou;
5334
5335 while (1) {
5336 if (dlm_locking_stopped(ls)) {
5337 log_debug(ls, "recover_waiters_post aborted");
5338 error = -EINTR;
5339 break;
5340 }
5341
5342 lkb = find_resend_waiter(ls);
5343 if (!lkb)
5344 break;
5345
5346 r = lkb->lkb_resource;
5347 hold_rsb(r);
5348 lock_rsb(r);
5349
5350 mstype = lkb->lkb_wait_type;
5351 oc = is_overlap_cancel(lkb);
5352 ou = is_overlap_unlock(lkb);
5353 err = 0;
5354
5355 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5356 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5357 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5358 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5359 dlm_dir_nodeid(r), oc, ou);
5360
5361 /* At this point we assume that we won't get a reply to any
5362 previous op or overlap op on this lock. First, do a big
5363 remove_from_waiters() for all previous ops. */
5364
5365 lkb->lkb_flags &= ~DLM_IFL_RESEND;
5366 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
5367 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
5368 lkb->lkb_wait_type = 0;
5369 /* drop all wait_count references we still
5370 * hold a reference for this iteration.
5371 */
5372 while (lkb->lkb_wait_count) {
5373 lkb->lkb_wait_count--;
5374 unhold_lkb(lkb);
5375 }
5376 mutex_lock(&ls->ls_waiters_mutex);
5377 list_del_init(&lkb->lkb_wait_reply);
5378 mutex_unlock(&ls->ls_waiters_mutex);
5379
5380 if (oc || ou) {
5381 /* do an unlock or cancel instead of resending */
5382 switch (mstype) {
5383 case DLM_MSG_LOOKUP:
5384 case DLM_MSG_REQUEST:
5385 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5386 -DLM_ECANCEL);
5387 unhold_lkb(lkb); /* undoes create_lkb() */
5388 break;
5389 case DLM_MSG_CONVERT:
5390 if (oc) {
5391 queue_cast(r, lkb, -DLM_ECANCEL);
5392 } else {
5393 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5394 _unlock_lock(r, lkb);
5395 }
5396 break;
5397 default:
5398 err = 1;
5399 }
5400 } else {
5401 switch (mstype) {
5402 case DLM_MSG_LOOKUP:
5403 case DLM_MSG_REQUEST:
5404 _request_lock(r, lkb);
5405 if (is_master(r))
5406 confirm_master(r, 0);
5407 break;
5408 case DLM_MSG_CONVERT:
5409 _convert_lock(r, lkb);
5410 break;
5411 default:
5412 err = 1;
5413 }
5414 }
5415
5416 if (err) {
5417 log_error(ls, "waiter %x msg %d r_nodeid %d "
5418 "dir_nodeid %d overlap %d %d",
5419 lkb->lkb_id, mstype, r->res_nodeid,
5420 dlm_dir_nodeid(r), oc, ou);
5421 }
5422 unlock_rsb(r);
5423 put_rsb(r);
5424 dlm_put_lkb(lkb);
5425 }
5426
5427 return error;
5428 }
5429
purge_mstcpy_list(struct dlm_ls * ls,struct dlm_rsb * r,struct list_head * list)5430 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5431 struct list_head *list)
5432 {
5433 struct dlm_lkb *lkb, *safe;
5434
5435 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5436 if (!is_master_copy(lkb))
5437 continue;
5438
5439 /* don't purge lkbs we've added in recover_master_copy for
5440 the current recovery seq */
5441
5442 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5443 continue;
5444
5445 del_lkb(r, lkb);
5446
5447 /* this put should free the lkb */
5448 if (!dlm_put_lkb(lkb))
5449 log_error(ls, "purged mstcpy lkb not released");
5450 }
5451 }
5452
dlm_purge_mstcpy_locks(struct dlm_rsb * r)5453 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5454 {
5455 struct dlm_ls *ls = r->res_ls;
5456
5457 purge_mstcpy_list(ls, r, &r->res_grantqueue);
5458 purge_mstcpy_list(ls, r, &r->res_convertqueue);
5459 purge_mstcpy_list(ls, r, &r->res_waitqueue);
5460 }
5461
purge_dead_list(struct dlm_ls * ls,struct dlm_rsb * r,struct list_head * list,int nodeid_gone,unsigned int * count)5462 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5463 struct list_head *list,
5464 int nodeid_gone, unsigned int *count)
5465 {
5466 struct dlm_lkb *lkb, *safe;
5467
5468 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5469 if (!is_master_copy(lkb))
5470 continue;
5471
5472 if ((lkb->lkb_nodeid == nodeid_gone) ||
5473 dlm_is_removed(ls, lkb->lkb_nodeid)) {
5474
5475 /* tell recover_lvb to invalidate the lvb
5476 because a node holding EX/PW failed */
5477 if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5478 (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5479 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5480 }
5481
5482 del_lkb(r, lkb);
5483
5484 /* this put should free the lkb */
5485 if (!dlm_put_lkb(lkb))
5486 log_error(ls, "purged dead lkb not released");
5487
5488 rsb_set_flag(r, RSB_RECOVER_GRANT);
5489
5490 (*count)++;
5491 }
5492 }
5493 }
5494
5495 /* Get rid of locks held by nodes that are gone. */
5496
dlm_recover_purge(struct dlm_ls * ls)5497 void dlm_recover_purge(struct dlm_ls *ls)
5498 {
5499 struct dlm_rsb *r;
5500 struct dlm_member *memb;
5501 int nodes_count = 0;
5502 int nodeid_gone = 0;
5503 unsigned int lkb_count = 0;
5504
5505 /* cache one removed nodeid to optimize the common
5506 case of a single node removed */
5507
5508 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5509 nodes_count++;
5510 nodeid_gone = memb->nodeid;
5511 }
5512
5513 if (!nodes_count)
5514 return;
5515
5516 down_write(&ls->ls_root_sem);
5517 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5518 hold_rsb(r);
5519 lock_rsb(r);
5520 if (is_master(r)) {
5521 purge_dead_list(ls, r, &r->res_grantqueue,
5522 nodeid_gone, &lkb_count);
5523 purge_dead_list(ls, r, &r->res_convertqueue,
5524 nodeid_gone, &lkb_count);
5525 purge_dead_list(ls, r, &r->res_waitqueue,
5526 nodeid_gone, &lkb_count);
5527 }
5528 unlock_rsb(r);
5529 unhold_rsb(r);
5530 cond_resched();
5531 }
5532 up_write(&ls->ls_root_sem);
5533
5534 if (lkb_count)
5535 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5536 lkb_count, nodes_count);
5537 }
5538
find_grant_rsb(struct dlm_ls * ls,int bucket)5539 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5540 {
5541 struct rb_node *n;
5542 struct dlm_rsb *r;
5543
5544 spin_lock(&ls->ls_rsbtbl[bucket].lock);
5545 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5546 r = rb_entry(n, struct dlm_rsb, res_hashnode);
5547
5548 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5549 continue;
5550 if (!is_master(r)) {
5551 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5552 continue;
5553 }
5554 hold_rsb(r);
5555 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5556 return r;
5557 }
5558 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5559 return NULL;
5560 }
5561
5562 /*
5563 * Attempt to grant locks on resources that we are the master of.
5564 * Locks may have become grantable during recovery because locks
5565 * from departed nodes have been purged (or not rebuilt), allowing
5566 * previously blocked locks to now be granted. The subset of rsb's
5567 * we are interested in are those with lkb's on either the convert or
5568 * waiting queues.
5569 *
5570 * Simplest would be to go through each master rsb and check for non-empty
5571 * convert or waiting queues, and attempt to grant on those rsbs.
5572 * Checking the queues requires lock_rsb, though, for which we'd need
5573 * to release the rsbtbl lock. This would make iterating through all
5574 * rsb's very inefficient. So, we rely on earlier recovery routines
5575 * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5576 * locks for.
5577 */
5578
dlm_recover_grant(struct dlm_ls * ls)5579 void dlm_recover_grant(struct dlm_ls *ls)
5580 {
5581 struct dlm_rsb *r;
5582 int bucket = 0;
5583 unsigned int count = 0;
5584 unsigned int rsb_count = 0;
5585 unsigned int lkb_count = 0;
5586
5587 while (1) {
5588 r = find_grant_rsb(ls, bucket);
5589 if (!r) {
5590 if (bucket == ls->ls_rsbtbl_size - 1)
5591 break;
5592 bucket++;
5593 continue;
5594 }
5595 rsb_count++;
5596 count = 0;
5597 lock_rsb(r);
5598 /* the RECOVER_GRANT flag is checked in the grant path */
5599 grant_pending_locks(r, &count);
5600 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5601 lkb_count += count;
5602 confirm_master(r, 0);
5603 unlock_rsb(r);
5604 put_rsb(r);
5605 cond_resched();
5606 }
5607
5608 if (lkb_count)
5609 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5610 lkb_count, rsb_count);
5611 }
5612
search_remid_list(struct list_head * head,int nodeid,uint32_t remid)5613 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5614 uint32_t remid)
5615 {
5616 struct dlm_lkb *lkb;
5617
5618 list_for_each_entry(lkb, head, lkb_statequeue) {
5619 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5620 return lkb;
5621 }
5622 return NULL;
5623 }
5624
search_remid(struct dlm_rsb * r,int nodeid,uint32_t remid)5625 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5626 uint32_t remid)
5627 {
5628 struct dlm_lkb *lkb;
5629
5630 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5631 if (lkb)
5632 return lkb;
5633 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5634 if (lkb)
5635 return lkb;
5636 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5637 if (lkb)
5638 return lkb;
5639 return NULL;
5640 }
5641
5642 /* needs at least dlm_rcom + rcom_lock */
receive_rcom_lock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_rsb * r,struct dlm_rcom * rc)5643 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5644 struct dlm_rsb *r, struct dlm_rcom *rc)
5645 {
5646 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5647
5648 lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5649 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5650 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5651 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5652 lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
5653 lkb->lkb_flags |= DLM_IFL_MSTCPY;
5654 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5655 lkb->lkb_rqmode = rl->rl_rqmode;
5656 lkb->lkb_grmode = rl->rl_grmode;
5657 /* don't set lkb_status because add_lkb wants to itself */
5658
5659 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5660 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5661
5662 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5663 int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5664 sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5665 if (lvblen > ls->ls_lvblen)
5666 return -EINVAL;
5667 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5668 if (!lkb->lkb_lvbptr)
5669 return -ENOMEM;
5670 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5671 }
5672
5673 /* Conversions between PR and CW (middle modes) need special handling.
5674 The real granted mode of these converting locks cannot be determined
5675 until all locks have been rebuilt on the rsb (recover_conversion) */
5676
5677 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5678 middle_conversion(lkb)) {
5679 rl->rl_status = DLM_LKSTS_CONVERT;
5680 lkb->lkb_grmode = DLM_LOCK_IV;
5681 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5682 }
5683
5684 return 0;
5685 }
5686
5687 /* This lkb may have been recovered in a previous aborted recovery so we need
5688 to check if the rsb already has an lkb with the given remote nodeid/lkid.
5689 If so we just send back a standard reply. If not, we create a new lkb with
5690 the given values and send back our lkid. We send back our lkid by sending
5691 back the rcom_lock struct we got but with the remid field filled in. */
5692
5693 /* needs at least dlm_rcom + rcom_lock */
dlm_recover_master_copy(struct dlm_ls * ls,struct dlm_rcom * rc)5694 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5695 {
5696 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5697 struct dlm_rsb *r;
5698 struct dlm_lkb *lkb;
5699 uint32_t remid = 0;
5700 int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5701 int error;
5702
5703 if (rl->rl_parent_lkid) {
5704 error = -EOPNOTSUPP;
5705 goto out;
5706 }
5707
5708 remid = le32_to_cpu(rl->rl_lkid);
5709
5710 /* In general we expect the rsb returned to be R_MASTER, but we don't
5711 have to require it. Recovery of masters on one node can overlap
5712 recovery of locks on another node, so one node can send us MSTCPY
5713 locks before we've made ourselves master of this rsb. We can still
5714 add new MSTCPY locks that we receive here without any harm; when
5715 we make ourselves master, dlm_recover_masters() won't touch the
5716 MSTCPY locks we've received early. */
5717
5718 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5719 from_nodeid, R_RECEIVE_RECOVER, &r);
5720 if (error)
5721 goto out;
5722
5723 lock_rsb(r);
5724
5725 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5726 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5727 from_nodeid, remid);
5728 error = -EBADR;
5729 goto out_unlock;
5730 }
5731
5732 lkb = search_remid(r, from_nodeid, remid);
5733 if (lkb) {
5734 error = -EEXIST;
5735 goto out_remid;
5736 }
5737
5738 error = create_lkb(ls, &lkb);
5739 if (error)
5740 goto out_unlock;
5741
5742 error = receive_rcom_lock_args(ls, lkb, r, rc);
5743 if (error) {
5744 __put_lkb(ls, lkb);
5745 goto out_unlock;
5746 }
5747
5748 attach_lkb(r, lkb);
5749 add_lkb(r, lkb, rl->rl_status);
5750 ls->ls_recover_locks_in++;
5751
5752 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5753 rsb_set_flag(r, RSB_RECOVER_GRANT);
5754
5755 out_remid:
5756 /* this is the new value returned to the lock holder for
5757 saving in its process-copy lkb */
5758 rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5759
5760 lkb->lkb_recover_seq = ls->ls_recover_seq;
5761
5762 out_unlock:
5763 unlock_rsb(r);
5764 put_rsb(r);
5765 out:
5766 if (error && error != -EEXIST)
5767 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5768 from_nodeid, remid, error);
5769 rl->rl_result = cpu_to_le32(error);
5770 return error;
5771 }
5772
5773 /* needs at least dlm_rcom + rcom_lock */
dlm_recover_process_copy(struct dlm_ls * ls,struct dlm_rcom * rc)5774 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5775 {
5776 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5777 struct dlm_rsb *r;
5778 struct dlm_lkb *lkb;
5779 uint32_t lkid, remid;
5780 int error, result;
5781
5782 lkid = le32_to_cpu(rl->rl_lkid);
5783 remid = le32_to_cpu(rl->rl_remid);
5784 result = le32_to_cpu(rl->rl_result);
5785
5786 error = find_lkb(ls, lkid, &lkb);
5787 if (error) {
5788 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5789 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5790 result);
5791 return error;
5792 }
5793
5794 r = lkb->lkb_resource;
5795 hold_rsb(r);
5796 lock_rsb(r);
5797
5798 if (!is_process_copy(lkb)) {
5799 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5800 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5801 result);
5802 dlm_dump_rsb(r);
5803 unlock_rsb(r);
5804 put_rsb(r);
5805 dlm_put_lkb(lkb);
5806 return -EINVAL;
5807 }
5808
5809 switch (result) {
5810 case -EBADR:
5811 /* There's a chance the new master received our lock before
5812 dlm_recover_master_reply(), this wouldn't happen if we did
5813 a barrier between recover_masters and recover_locks. */
5814
5815 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5816 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5817 result);
5818
5819 dlm_send_rcom_lock(r, lkb);
5820 goto out;
5821 case -EEXIST:
5822 case 0:
5823 lkb->lkb_remid = remid;
5824 break;
5825 default:
5826 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5827 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5828 result);
5829 }
5830
5831 /* an ack for dlm_recover_locks() which waits for replies from
5832 all the locks it sends to new masters */
5833 dlm_recovered_lock(r);
5834 out:
5835 unlock_rsb(r);
5836 put_rsb(r);
5837 dlm_put_lkb(lkb);
5838
5839 return 0;
5840 }
5841
dlm_user_request(struct dlm_ls * ls,struct dlm_user_args * ua,int mode,uint32_t flags,void * name,unsigned int namelen,unsigned long timeout_cs)5842 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5843 int mode, uint32_t flags, void *name, unsigned int namelen,
5844 unsigned long timeout_cs)
5845 {
5846 struct dlm_lkb *lkb;
5847 struct dlm_args args;
5848 int error;
5849
5850 dlm_lock_recovery(ls);
5851
5852 error = create_lkb(ls, &lkb);
5853 if (error) {
5854 kfree(ua);
5855 goto out;
5856 }
5857
5858 if (flags & DLM_LKF_VALBLK) {
5859 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5860 if (!ua->lksb.sb_lvbptr) {
5861 kfree(ua);
5862 __put_lkb(ls, lkb);
5863 error = -ENOMEM;
5864 goto out;
5865 }
5866 }
5867 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
5868 fake_astfn, ua, fake_bastfn, &args);
5869 if (error) {
5870 kfree(ua->lksb.sb_lvbptr);
5871 ua->lksb.sb_lvbptr = NULL;
5872 kfree(ua);
5873 __put_lkb(ls, lkb);
5874 goto out;
5875 }
5876
5877 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5878 When DLM_IFL_USER is set, the dlm knows that this is a userspace
5879 lock and that lkb_astparam is the dlm_user_args structure. */
5880 lkb->lkb_flags |= DLM_IFL_USER;
5881 error = request_lock(ls, lkb, name, namelen, &args);
5882
5883 switch (error) {
5884 case 0:
5885 break;
5886 case -EINPROGRESS:
5887 error = 0;
5888 break;
5889 case -EAGAIN:
5890 error = 0;
5891 fallthrough;
5892 default:
5893 __put_lkb(ls, lkb);
5894 goto out;
5895 }
5896
5897 /* add this new lkb to the per-process list of locks */
5898 spin_lock(&ua->proc->locks_spin);
5899 hold_lkb(lkb);
5900 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5901 spin_unlock(&ua->proc->locks_spin);
5902 out:
5903 dlm_unlock_recovery(ls);
5904 return error;
5905 }
5906
dlm_user_convert(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,int mode,uint32_t flags,uint32_t lkid,char * lvb_in,unsigned long timeout_cs)5907 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5908 int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5909 unsigned long timeout_cs)
5910 {
5911 struct dlm_lkb *lkb;
5912 struct dlm_args args;
5913 struct dlm_user_args *ua;
5914 int error;
5915
5916 dlm_lock_recovery(ls);
5917
5918 error = find_lkb(ls, lkid, &lkb);
5919 if (error)
5920 goto out;
5921
5922 /* user can change the params on its lock when it converts it, or
5923 add an lvb that didn't exist before */
5924
5925 ua = lkb->lkb_ua;
5926
5927 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5928 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5929 if (!ua->lksb.sb_lvbptr) {
5930 error = -ENOMEM;
5931 goto out_put;
5932 }
5933 }
5934 if (lvb_in && ua->lksb.sb_lvbptr)
5935 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5936
5937 ua->xid = ua_tmp->xid;
5938 ua->castparam = ua_tmp->castparam;
5939 ua->castaddr = ua_tmp->castaddr;
5940 ua->bastparam = ua_tmp->bastparam;
5941 ua->bastaddr = ua_tmp->bastaddr;
5942 ua->user_lksb = ua_tmp->user_lksb;
5943
5944 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5945 fake_astfn, ua, fake_bastfn, &args);
5946 if (error)
5947 goto out_put;
5948
5949 error = convert_lock(ls, lkb, &args);
5950
5951 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5952 error = 0;
5953 out_put:
5954 dlm_put_lkb(lkb);
5955 out:
5956 dlm_unlock_recovery(ls);
5957 kfree(ua_tmp);
5958 return error;
5959 }
5960
5961 /*
5962 * The caller asks for an orphan lock on a given resource with a given mode.
5963 * If a matching lock exists, it's moved to the owner's list of locks and
5964 * the lkid is returned.
5965 */
5966
dlm_user_adopt_orphan(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,int mode,uint32_t flags,void * name,unsigned int namelen,unsigned long timeout_cs,uint32_t * lkid)5967 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5968 int mode, uint32_t flags, void *name, unsigned int namelen,
5969 unsigned long timeout_cs, uint32_t *lkid)
5970 {
5971 struct dlm_lkb *lkb = NULL, *iter;
5972 struct dlm_user_args *ua;
5973 int found_other_mode = 0;
5974 int rv = 0;
5975
5976 mutex_lock(&ls->ls_orphans_mutex);
5977 list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5978 if (iter->lkb_resource->res_length != namelen)
5979 continue;
5980 if (memcmp(iter->lkb_resource->res_name, name, namelen))
5981 continue;
5982 if (iter->lkb_grmode != mode) {
5983 found_other_mode = 1;
5984 continue;
5985 }
5986
5987 lkb = iter;
5988 list_del_init(&iter->lkb_ownqueue);
5989 iter->lkb_flags &= ~DLM_IFL_ORPHAN;
5990 *lkid = iter->lkb_id;
5991 break;
5992 }
5993 mutex_unlock(&ls->ls_orphans_mutex);
5994
5995 if (!lkb && found_other_mode) {
5996 rv = -EAGAIN;
5997 goto out;
5998 }
5999
6000 if (!lkb) {
6001 rv = -ENOENT;
6002 goto out;
6003 }
6004
6005 lkb->lkb_exflags = flags;
6006 lkb->lkb_ownpid = (int) current->pid;
6007
6008 ua = lkb->lkb_ua;
6009
6010 ua->proc = ua_tmp->proc;
6011 ua->xid = ua_tmp->xid;
6012 ua->castparam = ua_tmp->castparam;
6013 ua->castaddr = ua_tmp->castaddr;
6014 ua->bastparam = ua_tmp->bastparam;
6015 ua->bastaddr = ua_tmp->bastaddr;
6016 ua->user_lksb = ua_tmp->user_lksb;
6017
6018 /*
6019 * The lkb reference from the ls_orphans list was not
6020 * removed above, and is now considered the reference
6021 * for the proc locks list.
6022 */
6023
6024 spin_lock(&ua->proc->locks_spin);
6025 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
6026 spin_unlock(&ua->proc->locks_spin);
6027 out:
6028 kfree(ua_tmp);
6029 return rv;
6030 }
6031
dlm_user_unlock(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,uint32_t flags,uint32_t lkid,char * lvb_in)6032 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
6033 uint32_t flags, uint32_t lkid, char *lvb_in)
6034 {
6035 struct dlm_lkb *lkb;
6036 struct dlm_args args;
6037 struct dlm_user_args *ua;
6038 int error;
6039
6040 dlm_lock_recovery(ls);
6041
6042 error = find_lkb(ls, lkid, &lkb);
6043 if (error)
6044 goto out;
6045
6046 ua = lkb->lkb_ua;
6047
6048 if (lvb_in && ua->lksb.sb_lvbptr)
6049 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
6050 if (ua_tmp->castparam)
6051 ua->castparam = ua_tmp->castparam;
6052 ua->user_lksb = ua_tmp->user_lksb;
6053
6054 error = set_unlock_args(flags, ua, &args);
6055 if (error)
6056 goto out_put;
6057
6058 error = unlock_lock(ls, lkb, &args);
6059
6060 if (error == -DLM_EUNLOCK)
6061 error = 0;
6062 /* from validate_unlock_args() */
6063 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
6064 error = 0;
6065 if (error)
6066 goto out_put;
6067
6068 spin_lock(&ua->proc->locks_spin);
6069 /* dlm_user_add_cb() may have already taken lkb off the proc list */
6070 if (!list_empty(&lkb->lkb_ownqueue))
6071 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
6072 spin_unlock(&ua->proc->locks_spin);
6073 out_put:
6074 dlm_put_lkb(lkb);
6075 out:
6076 dlm_unlock_recovery(ls);
6077 kfree(ua_tmp);
6078 return error;
6079 }
6080
dlm_user_cancel(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,uint32_t flags,uint32_t lkid)6081 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
6082 uint32_t flags, uint32_t lkid)
6083 {
6084 struct dlm_lkb *lkb;
6085 struct dlm_args args;
6086 struct dlm_user_args *ua;
6087 int error;
6088
6089 dlm_lock_recovery(ls);
6090
6091 error = find_lkb(ls, lkid, &lkb);
6092 if (error)
6093 goto out;
6094
6095 ua = lkb->lkb_ua;
6096 if (ua_tmp->castparam)
6097 ua->castparam = ua_tmp->castparam;
6098 ua->user_lksb = ua_tmp->user_lksb;
6099
6100 error = set_unlock_args(flags, ua, &args);
6101 if (error)
6102 goto out_put;
6103
6104 error = cancel_lock(ls, lkb, &args);
6105
6106 if (error == -DLM_ECANCEL)
6107 error = 0;
6108 /* from validate_unlock_args() */
6109 if (error == -EBUSY)
6110 error = 0;
6111 out_put:
6112 dlm_put_lkb(lkb);
6113 out:
6114 dlm_unlock_recovery(ls);
6115 kfree(ua_tmp);
6116 return error;
6117 }
6118
dlm_user_deadlock(struct dlm_ls * ls,uint32_t flags,uint32_t lkid)6119 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6120 {
6121 struct dlm_lkb *lkb;
6122 struct dlm_args args;
6123 struct dlm_user_args *ua;
6124 struct dlm_rsb *r;
6125 int error;
6126
6127 dlm_lock_recovery(ls);
6128
6129 error = find_lkb(ls, lkid, &lkb);
6130 if (error)
6131 goto out;
6132
6133 ua = lkb->lkb_ua;
6134
6135 error = set_unlock_args(flags, ua, &args);
6136 if (error)
6137 goto out_put;
6138
6139 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6140
6141 r = lkb->lkb_resource;
6142 hold_rsb(r);
6143 lock_rsb(r);
6144
6145 error = validate_unlock_args(lkb, &args);
6146 if (error)
6147 goto out_r;
6148 lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
6149
6150 error = _cancel_lock(r, lkb);
6151 out_r:
6152 unlock_rsb(r);
6153 put_rsb(r);
6154
6155 if (error == -DLM_ECANCEL)
6156 error = 0;
6157 /* from validate_unlock_args() */
6158 if (error == -EBUSY)
6159 error = 0;
6160 out_put:
6161 dlm_put_lkb(lkb);
6162 out:
6163 dlm_unlock_recovery(ls);
6164 return error;
6165 }
6166
6167 /* lkb's that are removed from the waiters list by revert are just left on the
6168 orphans list with the granted orphan locks, to be freed by purge */
6169
orphan_proc_lock(struct dlm_ls * ls,struct dlm_lkb * lkb)6170 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6171 {
6172 struct dlm_args args;
6173 int error;
6174
6175 hold_lkb(lkb); /* reference for the ls_orphans list */
6176 mutex_lock(&ls->ls_orphans_mutex);
6177 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6178 mutex_unlock(&ls->ls_orphans_mutex);
6179
6180 set_unlock_args(0, lkb->lkb_ua, &args);
6181
6182 error = cancel_lock(ls, lkb, &args);
6183 if (error == -DLM_ECANCEL)
6184 error = 0;
6185 return error;
6186 }
6187
6188 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6189 granted. Regardless of what rsb queue the lock is on, it's removed and
6190 freed. The IVVALBLK flag causes the lvb on the resource to be invalidated
6191 if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6192
unlock_proc_lock(struct dlm_ls * ls,struct dlm_lkb * lkb)6193 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6194 {
6195 struct dlm_args args;
6196 int error;
6197
6198 set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6199 lkb->lkb_ua, &args);
6200
6201 error = unlock_lock(ls, lkb, &args);
6202 if (error == -DLM_EUNLOCK)
6203 error = 0;
6204 return error;
6205 }
6206
6207 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6208 (which does lock_rsb) due to deadlock with receiving a message that does
6209 lock_rsb followed by dlm_user_add_cb() */
6210
del_proc_lock(struct dlm_ls * ls,struct dlm_user_proc * proc)6211 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6212 struct dlm_user_proc *proc)
6213 {
6214 struct dlm_lkb *lkb = NULL;
6215
6216 mutex_lock(&ls->ls_clear_proc_locks);
6217 if (list_empty(&proc->locks))
6218 goto out;
6219
6220 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6221 list_del_init(&lkb->lkb_ownqueue);
6222
6223 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6224 lkb->lkb_flags |= DLM_IFL_ORPHAN;
6225 else
6226 lkb->lkb_flags |= DLM_IFL_DEAD;
6227 out:
6228 mutex_unlock(&ls->ls_clear_proc_locks);
6229 return lkb;
6230 }
6231
6232 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6233 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6234 which we clear here. */
6235
6236 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6237 list, and no more device_writes should add lkb's to proc->locks list; so we
6238 shouldn't need to take asts_spin or locks_spin here. this assumes that
6239 device reads/writes/closes are serialized -- FIXME: we may need to serialize
6240 them ourself. */
6241
dlm_clear_proc_locks(struct dlm_ls * ls,struct dlm_user_proc * proc)6242 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6243 {
6244 struct dlm_lkb *lkb, *safe;
6245
6246 dlm_lock_recovery(ls);
6247
6248 while (1) {
6249 lkb = del_proc_lock(ls, proc);
6250 if (!lkb)
6251 break;
6252 del_timeout(lkb);
6253 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6254 orphan_proc_lock(ls, lkb);
6255 else
6256 unlock_proc_lock(ls, lkb);
6257
6258 /* this removes the reference for the proc->locks list
6259 added by dlm_user_request, it may result in the lkb
6260 being freed */
6261
6262 dlm_put_lkb(lkb);
6263 }
6264
6265 mutex_lock(&ls->ls_clear_proc_locks);
6266
6267 /* in-progress unlocks */
6268 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6269 list_del_init(&lkb->lkb_ownqueue);
6270 lkb->lkb_flags |= DLM_IFL_DEAD;
6271 dlm_put_lkb(lkb);
6272 }
6273
6274 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6275 memset(&lkb->lkb_callbacks, 0,
6276 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6277 list_del_init(&lkb->lkb_cb_list);
6278 dlm_put_lkb(lkb);
6279 }
6280
6281 mutex_unlock(&ls->ls_clear_proc_locks);
6282 dlm_unlock_recovery(ls);
6283 }
6284
purge_proc_locks(struct dlm_ls * ls,struct dlm_user_proc * proc)6285 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6286 {
6287 struct dlm_lkb *lkb, *safe;
6288
6289 while (1) {
6290 lkb = NULL;
6291 spin_lock(&proc->locks_spin);
6292 if (!list_empty(&proc->locks)) {
6293 lkb = list_entry(proc->locks.next, struct dlm_lkb,
6294 lkb_ownqueue);
6295 list_del_init(&lkb->lkb_ownqueue);
6296 }
6297 spin_unlock(&proc->locks_spin);
6298
6299 if (!lkb)
6300 break;
6301
6302 lkb->lkb_flags |= DLM_IFL_DEAD;
6303 unlock_proc_lock(ls, lkb);
6304 dlm_put_lkb(lkb); /* ref from proc->locks list */
6305 }
6306
6307 spin_lock(&proc->locks_spin);
6308 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6309 list_del_init(&lkb->lkb_ownqueue);
6310 lkb->lkb_flags |= DLM_IFL_DEAD;
6311 dlm_put_lkb(lkb);
6312 }
6313 spin_unlock(&proc->locks_spin);
6314
6315 spin_lock(&proc->asts_spin);
6316 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6317 memset(&lkb->lkb_callbacks, 0,
6318 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6319 list_del_init(&lkb->lkb_cb_list);
6320 dlm_put_lkb(lkb);
6321 }
6322 spin_unlock(&proc->asts_spin);
6323 }
6324
6325 /* pid of 0 means purge all orphans */
6326
do_purge(struct dlm_ls * ls,int nodeid,int pid)6327 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6328 {
6329 struct dlm_lkb *lkb, *safe;
6330
6331 mutex_lock(&ls->ls_orphans_mutex);
6332 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6333 if (pid && lkb->lkb_ownpid != pid)
6334 continue;
6335 unlock_proc_lock(ls, lkb);
6336 list_del_init(&lkb->lkb_ownqueue);
6337 dlm_put_lkb(lkb);
6338 }
6339 mutex_unlock(&ls->ls_orphans_mutex);
6340 }
6341
send_purge(struct dlm_ls * ls,int nodeid,int pid)6342 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6343 {
6344 struct dlm_message *ms;
6345 struct dlm_mhandle *mh;
6346 int error;
6347
6348 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6349 DLM_MSG_PURGE, &ms, &mh);
6350 if (error)
6351 return error;
6352 ms->m_nodeid = cpu_to_le32(nodeid);
6353 ms->m_pid = cpu_to_le32(pid);
6354
6355 return send_message(mh, ms);
6356 }
6357
dlm_user_purge(struct dlm_ls * ls,struct dlm_user_proc * proc,int nodeid,int pid)6358 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6359 int nodeid, int pid)
6360 {
6361 int error = 0;
6362
6363 if (nodeid && (nodeid != dlm_our_nodeid())) {
6364 error = send_purge(ls, nodeid, pid);
6365 } else {
6366 dlm_lock_recovery(ls);
6367 if (pid == current->pid)
6368 purge_proc_locks(ls, proc);
6369 else
6370 do_purge(ls, nodeid, pid);
6371 dlm_unlock_recovery(ls);
6372 }
6373 return error;
6374 }
6375
6376 /* debug functionality */
dlm_debug_add_lkb(struct dlm_ls * ls,uint32_t lkb_id,char * name,int len,int lkb_nodeid,unsigned int lkb_flags,int lkb_status)6377 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6378 int lkb_nodeid, unsigned int lkb_flags, int lkb_status)
6379 {
6380 struct dlm_lksb *lksb;
6381 struct dlm_lkb *lkb;
6382 struct dlm_rsb *r;
6383 int error;
6384
6385 /* we currently can't set a valid user lock */
6386 if (lkb_flags & DLM_IFL_USER)
6387 return -EOPNOTSUPP;
6388
6389 lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6390 if (!lksb)
6391 return -ENOMEM;
6392
6393 error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6394 if (error) {
6395 kfree(lksb);
6396 return error;
6397 }
6398
6399 lkb->lkb_flags = lkb_flags;
6400 lkb->lkb_nodeid = lkb_nodeid;
6401 lkb->lkb_lksb = lksb;
6402 /* user specific pointer, just don't have it NULL for kernel locks */
6403 if (~lkb_flags & DLM_IFL_USER)
6404 lkb->lkb_astparam = (void *)0xDEADBEEF;
6405
6406 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6407 if (error) {
6408 kfree(lksb);
6409 __put_lkb(ls, lkb);
6410 return error;
6411 }
6412
6413 lock_rsb(r);
6414 attach_lkb(r, lkb);
6415 add_lkb(r, lkb, lkb_status);
6416 unlock_rsb(r);
6417 put_rsb(r);
6418
6419 return 0;
6420 }
6421
dlm_debug_add_lkb_to_waiters(struct dlm_ls * ls,uint32_t lkb_id,int mstype,int to_nodeid)6422 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6423 int mstype, int to_nodeid)
6424 {
6425 struct dlm_lkb *lkb;
6426 int error;
6427
6428 error = find_lkb(ls, lkb_id, &lkb);
6429 if (error)
6430 return error;
6431
6432 error = add_to_waiters(lkb, mstype, to_nodeid);
6433 dlm_put_lkb(lkb);
6434 return error;
6435 }
6436
6437