1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "lowcomms.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "memory.h"
21 #include "recover.h"
22 #include "util.h"
23 #include "lock.h"
24 #include "dir.h"
25 
26 
put_free_de(struct dlm_ls * ls,struct dlm_direntry * de)27 static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
28 {
29 	spin_lock(&ls->ls_recover_list_lock);
30 	list_add(&de->list, &ls->ls_recover_list);
31 	spin_unlock(&ls->ls_recover_list_lock);
32 }
33 
get_free_de(struct dlm_ls * ls,int len)34 static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
35 {
36 	int found = 0;
37 	struct dlm_direntry *de;
38 
39 	spin_lock(&ls->ls_recover_list_lock);
40 	list_for_each_entry(de, &ls->ls_recover_list, list) {
41 		if (de->length == len) {
42 			list_del(&de->list);
43 			de->master_nodeid = 0;
44 			memset(de->name, 0, len);
45 			found = 1;
46 			break;
47 		}
48 	}
49 	spin_unlock(&ls->ls_recover_list_lock);
50 
51 	if (!found)
52 		de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_NOFS);
53 	return de;
54 }
55 
dlm_clear_free_entries(struct dlm_ls * ls)56 void dlm_clear_free_entries(struct dlm_ls *ls)
57 {
58 	struct dlm_direntry *de;
59 
60 	spin_lock(&ls->ls_recover_list_lock);
61 	while (!list_empty(&ls->ls_recover_list)) {
62 		de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
63 				list);
64 		list_del(&de->list);
65 		kfree(de);
66 	}
67 	spin_unlock(&ls->ls_recover_list_lock);
68 }
69 
70 /*
71  * We use the upper 16 bits of the hash value to select the directory node.
72  * Low bits are used for distribution of rsb's among hash buckets on each node.
73  *
74  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
75  * num_nodes to the hash value.  This value in the desired range is used as an
76  * offset into the sorted list of nodeid's to give the particular nodeid.
77  */
78 
dlm_hash2nodeid(struct dlm_ls * ls,uint32_t hash)79 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
80 {
81 	struct list_head *tmp;
82 	struct dlm_member *memb = NULL;
83 	uint32_t node, n = 0;
84 	int nodeid;
85 
86 	if (ls->ls_num_nodes == 1) {
87 		nodeid = dlm_our_nodeid();
88 		goto out;
89 	}
90 
91 	if (ls->ls_node_array) {
92 		node = (hash >> 16) % ls->ls_total_weight;
93 		nodeid = ls->ls_node_array[node];
94 		goto out;
95 	}
96 
97 	/* make_member_array() failed to kmalloc ls_node_array... */
98 
99 	node = (hash >> 16) % ls->ls_num_nodes;
100 
101 	list_for_each(tmp, &ls->ls_nodes) {
102 		if (n++ != node)
103 			continue;
104 		memb = list_entry(tmp, struct dlm_member, list);
105 		break;
106 	}
107 
108 	DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
109 				 ls->ls_num_nodes, n, node););
110 	nodeid = memb->nodeid;
111  out:
112 	return nodeid;
113 }
114 
dlm_dir_nodeid(struct dlm_rsb * r)115 int dlm_dir_nodeid(struct dlm_rsb *r)
116 {
117 	return dlm_hash2nodeid(r->res_ls, r->res_hash);
118 }
119 
dir_hash(struct dlm_ls * ls,char * name,int len)120 static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
121 {
122 	uint32_t val;
123 
124 	val = jhash(name, len, 0);
125 	val &= (ls->ls_dirtbl_size - 1);
126 
127 	return val;
128 }
129 
add_entry_to_hash(struct dlm_ls * ls,struct dlm_direntry * de)130 static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
131 {
132 	uint32_t bucket;
133 
134 	bucket = dir_hash(ls, de->name, de->length);
135 	list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
136 }
137 
search_bucket(struct dlm_ls * ls,char * name,int namelen,uint32_t bucket)138 static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
139 					  int namelen, uint32_t bucket)
140 {
141 	struct dlm_direntry *de;
142 
143 	list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
144 		if (de->length == namelen && !memcmp(name, de->name, namelen))
145 			goto out;
146 	}
147 	de = NULL;
148  out:
149 	return de;
150 }
151 
dlm_dir_remove_entry(struct dlm_ls * ls,int nodeid,char * name,int namelen)152 void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
153 {
154 	struct dlm_direntry *de;
155 	uint32_t bucket;
156 
157 	bucket = dir_hash(ls, name, namelen);
158 
159 	spin_lock(&ls->ls_dirtbl[bucket].lock);
160 
161 	de = search_bucket(ls, name, namelen, bucket);
162 
163 	if (!de) {
164 		log_error(ls, "remove fr %u none", nodeid);
165 		goto out;
166 	}
167 
168 	if (de->master_nodeid != nodeid) {
169 		log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
170 		goto out;
171 	}
172 
173 	list_del(&de->list);
174 	kfree(de);
175  out:
176 	spin_unlock(&ls->ls_dirtbl[bucket].lock);
177 }
178 
dlm_dir_clear(struct dlm_ls * ls)179 void dlm_dir_clear(struct dlm_ls *ls)
180 {
181 	struct list_head *head;
182 	struct dlm_direntry *de;
183 	int i;
184 
185 	DLM_ASSERT(list_empty(&ls->ls_recover_list), );
186 
187 	for (i = 0; i < ls->ls_dirtbl_size; i++) {
188 		spin_lock(&ls->ls_dirtbl[i].lock);
189 		head = &ls->ls_dirtbl[i].list;
190 		while (!list_empty(head)) {
191 			de = list_entry(head->next, struct dlm_direntry, list);
192 			list_del(&de->list);
193 			put_free_de(ls, de);
194 		}
195 		spin_unlock(&ls->ls_dirtbl[i].lock);
196 	}
197 }
198 
dlm_recover_directory(struct dlm_ls * ls)199 int dlm_recover_directory(struct dlm_ls *ls)
200 {
201 	struct dlm_member *memb;
202 	struct dlm_direntry *de;
203 	char *b, *last_name = NULL;
204 	int error = -ENOMEM, last_len, count = 0;
205 	uint16_t namelen;
206 
207 	log_debug(ls, "dlm_recover_directory");
208 
209 	if (dlm_no_directory(ls))
210 		goto out_status;
211 
212 	dlm_dir_clear(ls);
213 
214 	last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
215 	if (!last_name)
216 		goto out;
217 
218 	list_for_each_entry(memb, &ls->ls_nodes, list) {
219 		memset(last_name, 0, DLM_RESNAME_MAXLEN);
220 		last_len = 0;
221 
222 		for (;;) {
223 			int left;
224 			error = dlm_recovery_stopped(ls);
225 			if (error)
226 				goto out_free;
227 
228 			error = dlm_rcom_names(ls, memb->nodeid,
229 					       last_name, last_len);
230 			if (error)
231 				goto out_free;
232 
233 			schedule();
234 
235 			/*
236 			 * pick namelen/name pairs out of received buffer
237 			 */
238 
239 			b = ls->ls_recover_buf->rc_buf;
240 			left = ls->ls_recover_buf->rc_header.h_length;
241 			left -= sizeof(struct dlm_rcom);
242 
243 			for (;;) {
244 				__be16 v;
245 
246 				error = -EINVAL;
247 				if (left < sizeof(__be16))
248 					goto out_free;
249 
250 				memcpy(&v, b, sizeof(__be16));
251 				namelen = be16_to_cpu(v);
252 				b += sizeof(__be16);
253 				left -= sizeof(__be16);
254 
255 				/* namelen of 0xFFFFF marks end of names for
256 				   this node; namelen of 0 marks end of the
257 				   buffer */
258 
259 				if (namelen == 0xFFFF)
260 					goto done;
261 				if (!namelen)
262 					break;
263 
264 				if (namelen > left)
265 					goto out_free;
266 
267 				if (namelen > DLM_RESNAME_MAXLEN)
268 					goto out_free;
269 
270 				error = -ENOMEM;
271 				de = get_free_de(ls, namelen);
272 				if (!de)
273 					goto out_free;
274 
275 				de->master_nodeid = memb->nodeid;
276 				de->length = namelen;
277 				last_len = namelen;
278 				memcpy(de->name, b, namelen);
279 				memcpy(last_name, b, namelen);
280 				b += namelen;
281 				left -= namelen;
282 
283 				add_entry_to_hash(ls, de);
284 				count++;
285 			}
286 		}
287          done:
288 		;
289 	}
290 
291  out_status:
292 	error = 0;
293 	log_debug(ls, "dlm_recover_directory %d entries", count);
294  out_free:
295 	kfree(last_name);
296  out:
297 	dlm_clear_free_entries(ls);
298 	return error;
299 }
300 
get_entry(struct dlm_ls * ls,int nodeid,char * name,int namelen,int * r_nodeid)301 static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
302 		     int namelen, int *r_nodeid)
303 {
304 	struct dlm_direntry *de, *tmp;
305 	uint32_t bucket;
306 
307 	bucket = dir_hash(ls, name, namelen);
308 
309 	spin_lock(&ls->ls_dirtbl[bucket].lock);
310 	de = search_bucket(ls, name, namelen, bucket);
311 	if (de) {
312 		*r_nodeid = de->master_nodeid;
313 		spin_unlock(&ls->ls_dirtbl[bucket].lock);
314 		if (*r_nodeid == nodeid)
315 			return -EEXIST;
316 		return 0;
317 	}
318 
319 	spin_unlock(&ls->ls_dirtbl[bucket].lock);
320 
321 	if (namelen > DLM_RESNAME_MAXLEN)
322 		return -EINVAL;
323 
324 	de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_NOFS);
325 	if (!de)
326 		return -ENOMEM;
327 
328 	de->master_nodeid = nodeid;
329 	de->length = namelen;
330 	memcpy(de->name, name, namelen);
331 
332 	spin_lock(&ls->ls_dirtbl[bucket].lock);
333 	tmp = search_bucket(ls, name, namelen, bucket);
334 	if (tmp) {
335 		kfree(de);
336 		de = tmp;
337 	} else {
338 		list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
339 	}
340 	*r_nodeid = de->master_nodeid;
341 	spin_unlock(&ls->ls_dirtbl[bucket].lock);
342 	return 0;
343 }
344 
dlm_dir_lookup(struct dlm_ls * ls,int nodeid,char * name,int namelen,int * r_nodeid)345 int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
346 		   int *r_nodeid)
347 {
348 	return get_entry(ls, nodeid, name, namelen, r_nodeid);
349 }
350 
find_rsb_root(struct dlm_ls * ls,char * name,int len)351 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
352 {
353 	struct dlm_rsb *r;
354 	uint32_t hash, bucket;
355 	int rv;
356 
357 	hash = jhash(name, len, 0);
358 	bucket = hash & (ls->ls_rsbtbl_size - 1);
359 
360 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
361 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, 0, &r);
362 	if (rv)
363 		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
364 					 name, len, 0, &r);
365 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
366 
367 	if (!rv)
368 		return r;
369 
370 	down_read(&ls->ls_root_sem);
371 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
372 		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
373 			up_read(&ls->ls_root_sem);
374 			log_error(ls, "find_rsb_root revert to root_list %s",
375 				  r->res_name);
376 			return r;
377 		}
378 	}
379 	up_read(&ls->ls_root_sem);
380 	return NULL;
381 }
382 
383 /* Find the rsb where we left off (or start again), then send rsb names
384    for rsb's we're master of and whose directory node matches the requesting
385    node.  inbuf is the rsb name last sent, inlen is the name's length */
386 
dlm_copy_master_names(struct dlm_ls * ls,char * inbuf,int inlen,char * outbuf,int outlen,int nodeid)387 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
388  			   char *outbuf, int outlen, int nodeid)
389 {
390 	struct list_head *list;
391 	struct dlm_rsb *r;
392 	int offset = 0, dir_nodeid;
393 	__be16 be_namelen;
394 
395 	down_read(&ls->ls_root_sem);
396 
397 	if (inlen > 1) {
398 		r = find_rsb_root(ls, inbuf, inlen);
399 		if (!r) {
400 			inbuf[inlen - 1] = '\0';
401 			log_error(ls, "copy_master_names from %d start %d %s",
402 				  nodeid, inlen, inbuf);
403 			goto out;
404 		}
405 		list = r->res_root_list.next;
406 	} else {
407 		list = ls->ls_root_list.next;
408 	}
409 
410 	for (offset = 0; list != &ls->ls_root_list; list = list->next) {
411 		r = list_entry(list, struct dlm_rsb, res_root_list);
412 		if (r->res_nodeid)
413 			continue;
414 
415 		dir_nodeid = dlm_dir_nodeid(r);
416 		if (dir_nodeid != nodeid)
417 			continue;
418 
419 		/*
420 		 * The block ends when we can't fit the following in the
421 		 * remaining buffer space:
422 		 * namelen (uint16_t) +
423 		 * name (r->res_length) +
424 		 * end-of-block record 0x0000 (uint16_t)
425 		 */
426 
427 		if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
428 			/* Write end-of-block record */
429 			be_namelen = cpu_to_be16(0);
430 			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
431 			offset += sizeof(__be16);
432 			goto out;
433 		}
434 
435 		be_namelen = cpu_to_be16(r->res_length);
436 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
437 		offset += sizeof(__be16);
438 		memcpy(outbuf + offset, r->res_name, r->res_length);
439 		offset += r->res_length;
440 	}
441 
442 	/*
443 	 * If we've reached the end of the list (and there's room) write a
444 	 * terminating record.
445 	 */
446 
447 	if ((list == &ls->ls_root_list) &&
448 	    (offset + sizeof(uint16_t) <= outlen)) {
449 		be_namelen = cpu_to_be16(0xFFFF);
450 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
451 		offset += sizeof(__be16);
452 	}
453 
454  out:
455 	up_read(&ls->ls_root_sem);
456 }
457 
458