1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "lowcomms.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "memory.h"
21 #include "recover.h"
22 #include "util.h"
23 #include "lock.h"
24 #include "dir.h"
25 
26 
put_free_de(struct dlm_ls * ls,struct dlm_direntry * de)27 static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
28 {
29 	spin_lock(&ls->ls_recover_list_lock);
30 	list_add(&de->list, &ls->ls_recover_list);
31 	spin_unlock(&ls->ls_recover_list_lock);
32 }
33 
get_free_de(struct dlm_ls * ls,int len)34 static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
35 {
36 	int found = 0;
37 	struct dlm_direntry *de;
38 
39 	spin_lock(&ls->ls_recover_list_lock);
40 	list_for_each_entry(de, &ls->ls_recover_list, list) {
41 		if (de->length == len) {
42 			list_del(&de->list);
43 			de->master_nodeid = 0;
44 			memset(de->name, 0, len);
45 			found = 1;
46 			break;
47 		}
48 	}
49 	spin_unlock(&ls->ls_recover_list_lock);
50 
51 	if (!found)
52 		de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_NOFS);
53 	return de;
54 }
55 
dlm_clear_free_entries(struct dlm_ls * ls)56 void dlm_clear_free_entries(struct dlm_ls *ls)
57 {
58 	struct dlm_direntry *de;
59 
60 	spin_lock(&ls->ls_recover_list_lock);
61 	while (!list_empty(&ls->ls_recover_list)) {
62 		de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
63 				list);
64 		list_del(&de->list);
65 		kfree(de);
66 	}
67 	spin_unlock(&ls->ls_recover_list_lock);
68 }
69 
70 /*
71  * We use the upper 16 bits of the hash value to select the directory node.
72  * Low bits are used for distribution of rsb's among hash buckets on each node.
73  *
74  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
75  * num_nodes to the hash value.  This value in the desired range is used as an
76  * offset into the sorted list of nodeid's to give the particular nodeid.
77  */
78 
dlm_hash2nodeid(struct dlm_ls * ls,uint32_t hash)79 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
80 {
81 	struct list_head *tmp;
82 	struct dlm_member *memb = NULL;
83 	uint32_t node, n = 0;
84 	int nodeid;
85 
86 	if (ls->ls_num_nodes == 1) {
87 		nodeid = dlm_our_nodeid();
88 		goto out;
89 	}
90 
91 	if (ls->ls_node_array) {
92 		node = (hash >> 16) % ls->ls_total_weight;
93 		nodeid = ls->ls_node_array[node];
94 		goto out;
95 	}
96 
97 	/* make_member_array() failed to kmalloc ls_node_array... */
98 
99 	node = (hash >> 16) % ls->ls_num_nodes;
100 
101 	list_for_each(tmp, &ls->ls_nodes) {
102 		if (n++ != node)
103 			continue;
104 		memb = list_entry(tmp, struct dlm_member, list);
105 		break;
106 	}
107 
108 	DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
109 				 ls->ls_num_nodes, n, node););
110 	nodeid = memb->nodeid;
111  out:
112 	return nodeid;
113 }
114 
dlm_dir_nodeid(struct dlm_rsb * r)115 int dlm_dir_nodeid(struct dlm_rsb *r)
116 {
117 	return dlm_hash2nodeid(r->res_ls, r->res_hash);
118 }
119 
dir_hash(struct dlm_ls * ls,char * name,int len)120 static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
121 {
122 	uint32_t val;
123 
124 	val = jhash(name, len, 0);
125 	val &= (ls->ls_dirtbl_size - 1);
126 
127 	return val;
128 }
129 
add_entry_to_hash(struct dlm_ls * ls,struct dlm_direntry * de)130 static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
131 {
132 	uint32_t bucket;
133 
134 	bucket = dir_hash(ls, de->name, de->length);
135 	list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
136 }
137 
search_bucket(struct dlm_ls * ls,char * name,int namelen,uint32_t bucket)138 static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
139 					  int namelen, uint32_t bucket)
140 {
141 	struct dlm_direntry *de;
142 
143 	list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
144 		if (de->length == namelen && !memcmp(name, de->name, namelen))
145 			goto out;
146 	}
147 	de = NULL;
148  out:
149 	return de;
150 }
151 
dlm_dir_remove_entry(struct dlm_ls * ls,int nodeid,char * name,int namelen)152 void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
153 {
154 	struct dlm_direntry *de;
155 	uint32_t bucket;
156 
157 	bucket = dir_hash(ls, name, namelen);
158 
159 	spin_lock(&ls->ls_dirtbl[bucket].lock);
160 
161 	de = search_bucket(ls, name, namelen, bucket);
162 
163 	if (!de) {
164 		log_error(ls, "remove fr %u none", nodeid);
165 		goto out;
166 	}
167 
168 	if (de->master_nodeid != nodeid) {
169 		log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
170 		goto out;
171 	}
172 
173 	list_del(&de->list);
174 	kfree(de);
175  out:
176 	spin_unlock(&ls->ls_dirtbl[bucket].lock);
177 }
178 
dlm_dir_clear(struct dlm_ls * ls)179 void dlm_dir_clear(struct dlm_ls *ls)
180 {
181 	struct list_head *head;
182 	struct dlm_direntry *de;
183 	int i;
184 
185 	DLM_ASSERT(list_empty(&ls->ls_recover_list), );
186 
187 	for (i = 0; i < ls->ls_dirtbl_size; i++) {
188 		spin_lock(&ls->ls_dirtbl[i].lock);
189 		head = &ls->ls_dirtbl[i].list;
190 		while (!list_empty(head)) {
191 			de = list_entry(head->next, struct dlm_direntry, list);
192 			list_del(&de->list);
193 			put_free_de(ls, de);
194 		}
195 		spin_unlock(&ls->ls_dirtbl[i].lock);
196 	}
197 }
198 
dlm_recover_directory(struct dlm_ls * ls)199 int dlm_recover_directory(struct dlm_ls *ls)
200 {
201 	struct dlm_member *memb;
202 	struct dlm_direntry *de;
203 	char *b, *last_name = NULL;
204 	int error = -ENOMEM, last_len, count = 0;
205 	uint16_t namelen;
206 
207 	log_debug(ls, "dlm_recover_directory");
208 
209 	if (dlm_no_directory(ls))
210 		goto out_status;
211 
212 	dlm_dir_clear(ls);
213 
214 	last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
215 	if (!last_name)
216 		goto out;
217 
218 	list_for_each_entry(memb, &ls->ls_nodes, list) {
219 		memset(last_name, 0, DLM_RESNAME_MAXLEN);
220 		last_len = 0;
221 
222 		for (;;) {
223 			int left;
224 			error = dlm_recovery_stopped(ls);
225 			if (error)
226 				goto out_free;
227 
228 			error = dlm_rcom_names(ls, memb->nodeid,
229 					       last_name, last_len);
230 			if (error)
231 				goto out_free;
232 
233 			schedule();
234 
235 			/*
236 			 * pick namelen/name pairs out of received buffer
237 			 */
238 
239 			b = ls->ls_recover_buf->rc_buf;
240 			left = ls->ls_recover_buf->rc_header.h_length;
241 			left -= sizeof(struct dlm_rcom);
242 
243 			for (;;) {
244 				__be16 v;
245 
246 				error = -EINVAL;
247 				if (left < sizeof(__be16))
248 					goto out_free;
249 
250 				memcpy(&v, b, sizeof(__be16));
251 				namelen = be16_to_cpu(v);
252 				b += sizeof(__be16);
253 				left -= sizeof(__be16);
254 
255 				/* namelen of 0xFFFFF marks end of names for
256 				   this node; namelen of 0 marks end of the
257 				   buffer */
258 
259 				if (namelen == 0xFFFF)
260 					goto done;
261 				if (!namelen)
262 					break;
263 
264 				if (namelen > left)
265 					goto out_free;
266 
267 				if (namelen > DLM_RESNAME_MAXLEN)
268 					goto out_free;
269 
270 				error = -ENOMEM;
271 				de = get_free_de(ls, namelen);
272 				if (!de)
273 					goto out_free;
274 
275 				de->master_nodeid = memb->nodeid;
276 				de->length = namelen;
277 				last_len = namelen;
278 				memcpy(de->name, b, namelen);
279 				memcpy(last_name, b, namelen);
280 				b += namelen;
281 				left -= namelen;
282 
283 				add_entry_to_hash(ls, de);
284 				count++;
285 			}
286 		}
287          done:
288 		;
289 	}
290 
291  out_status:
292 	error = 0;
293 	dlm_set_recover_status(ls, DLM_RS_DIR);
294 	log_debug(ls, "dlm_recover_directory %d entries", count);
295  out_free:
296 	kfree(last_name);
297  out:
298 	dlm_clear_free_entries(ls);
299 	return error;
300 }
301 
get_entry(struct dlm_ls * ls,int nodeid,char * name,int namelen,int * r_nodeid)302 static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
303 		     int namelen, int *r_nodeid)
304 {
305 	struct dlm_direntry *de, *tmp;
306 	uint32_t bucket;
307 
308 	bucket = dir_hash(ls, name, namelen);
309 
310 	spin_lock(&ls->ls_dirtbl[bucket].lock);
311 	de = search_bucket(ls, name, namelen, bucket);
312 	if (de) {
313 		*r_nodeid = de->master_nodeid;
314 		spin_unlock(&ls->ls_dirtbl[bucket].lock);
315 		if (*r_nodeid == nodeid)
316 			return -EEXIST;
317 		return 0;
318 	}
319 
320 	spin_unlock(&ls->ls_dirtbl[bucket].lock);
321 
322 	if (namelen > DLM_RESNAME_MAXLEN)
323 		return -EINVAL;
324 
325 	de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_NOFS);
326 	if (!de)
327 		return -ENOMEM;
328 
329 	de->master_nodeid = nodeid;
330 	de->length = namelen;
331 	memcpy(de->name, name, namelen);
332 
333 	spin_lock(&ls->ls_dirtbl[bucket].lock);
334 	tmp = search_bucket(ls, name, namelen, bucket);
335 	if (tmp) {
336 		kfree(de);
337 		de = tmp;
338 	} else {
339 		list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
340 	}
341 	*r_nodeid = de->master_nodeid;
342 	spin_unlock(&ls->ls_dirtbl[bucket].lock);
343 	return 0;
344 }
345 
dlm_dir_lookup(struct dlm_ls * ls,int nodeid,char * name,int namelen,int * r_nodeid)346 int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
347 		   int *r_nodeid)
348 {
349 	return get_entry(ls, nodeid, name, namelen, r_nodeid);
350 }
351 
find_rsb_root(struct dlm_ls * ls,char * name,int len)352 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
353 {
354 	struct dlm_rsb *r;
355 
356 	down_read(&ls->ls_root_sem);
357 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
358 		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
359 			up_read(&ls->ls_root_sem);
360 			return r;
361 		}
362 	}
363 	up_read(&ls->ls_root_sem);
364 	return NULL;
365 }
366 
367 /* Find the rsb where we left off (or start again), then send rsb names
368    for rsb's we're master of and whose directory node matches the requesting
369    node.  inbuf is the rsb name last sent, inlen is the name's length */
370 
dlm_copy_master_names(struct dlm_ls * ls,char * inbuf,int inlen,char * outbuf,int outlen,int nodeid)371 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
372  			   char *outbuf, int outlen, int nodeid)
373 {
374 	struct list_head *list;
375 	struct dlm_rsb *r;
376 	int offset = 0, dir_nodeid;
377 	__be16 be_namelen;
378 
379 	down_read(&ls->ls_root_sem);
380 
381 	if (inlen > 1) {
382 		r = find_rsb_root(ls, inbuf, inlen);
383 		if (!r) {
384 			inbuf[inlen - 1] = '\0';
385 			log_error(ls, "copy_master_names from %d start %d %s",
386 				  nodeid, inlen, inbuf);
387 			goto out;
388 		}
389 		list = r->res_root_list.next;
390 	} else {
391 		list = ls->ls_root_list.next;
392 	}
393 
394 	for (offset = 0; list != &ls->ls_root_list; list = list->next) {
395 		r = list_entry(list, struct dlm_rsb, res_root_list);
396 		if (r->res_nodeid)
397 			continue;
398 
399 		dir_nodeid = dlm_dir_nodeid(r);
400 		if (dir_nodeid != nodeid)
401 			continue;
402 
403 		/*
404 		 * The block ends when we can't fit the following in the
405 		 * remaining buffer space:
406 		 * namelen (uint16_t) +
407 		 * name (r->res_length) +
408 		 * end-of-block record 0x0000 (uint16_t)
409 		 */
410 
411 		if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
412 			/* Write end-of-block record */
413 			be_namelen = cpu_to_be16(0);
414 			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
415 			offset += sizeof(__be16);
416 			goto out;
417 		}
418 
419 		be_namelen = cpu_to_be16(r->res_length);
420 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
421 		offset += sizeof(__be16);
422 		memcpy(outbuf + offset, r->res_name, r->res_length);
423 		offset += r->res_length;
424 	}
425 
426 	/*
427 	 * If we've reached the end of the list (and there's room) write a
428 	 * terminating record.
429 	 */
430 
431 	if ((list == &ls->ls_root_list) &&
432 	    (offset + sizeof(uint16_t) <= outlen)) {
433 		be_namelen = cpu_to_be16(0xFFFF);
434 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
435 		offset += sizeof(__be16);
436 	}
437 
438  out:
439 	up_read(&ls->ls_root_sem);
440 }
441 
442