1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include "dlm_internal.h"
13 #include "lockspace.h"
14 #include "member.h"
15 #include "lowcomms.h"
16 #include "rcom.h"
17 #include "config.h"
18 #include "memory.h"
19 #include "recover.h"
20 #include "util.h"
21 #include "lock.h"
22 #include "dir.h"
23 
24 /*
25  * We use the upper 16 bits of the hash value to select the directory node.
26  * Low bits are used for distribution of rsb's among hash buckets on each node.
27  *
28  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
29  * num_nodes to the hash value.  This value in the desired range is used as an
30  * offset into the sorted list of nodeid's to give the particular nodeid.
31  */
32 
dlm_hash2nodeid(struct dlm_ls * ls,uint32_t hash)33 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
34 {
35 	uint32_t node;
36 
37 	if (ls->ls_num_nodes == 1)
38 		return dlm_our_nodeid();
39 	else {
40 		node = (hash >> 16) % ls->ls_total_weight;
41 		return ls->ls_node_array[node];
42 	}
43 }
44 
dlm_dir_nodeid(struct dlm_rsb * r)45 int dlm_dir_nodeid(struct dlm_rsb *r)
46 {
47 	return r->res_dir_nodeid;
48 }
49 
dlm_recover_dir_nodeid(struct dlm_ls * ls)50 void dlm_recover_dir_nodeid(struct dlm_ls *ls)
51 {
52 	struct dlm_rsb *r;
53 
54 	down_read(&ls->ls_root_sem);
55 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
56 		r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
57 	}
58 	up_read(&ls->ls_root_sem);
59 }
60 
dlm_recover_directory(struct dlm_ls * ls)61 int dlm_recover_directory(struct dlm_ls *ls)
62 {
63 	struct dlm_member *memb;
64 	char *b, *last_name = NULL;
65 	int error = -ENOMEM, last_len, nodeid, result;
66 	uint16_t namelen;
67 	unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
68 
69 	log_rinfo(ls, "dlm_recover_directory");
70 
71 	if (dlm_no_directory(ls))
72 		goto out_status;
73 
74 	last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
75 	if (!last_name)
76 		goto out;
77 
78 	list_for_each_entry(memb, &ls->ls_nodes, list) {
79 		if (memb->nodeid == dlm_our_nodeid())
80 			continue;
81 
82 		memset(last_name, 0, DLM_RESNAME_MAXLEN);
83 		last_len = 0;
84 
85 		for (;;) {
86 			int left;
87 			if (dlm_recovery_stopped(ls)) {
88 				error = -EINTR;
89 				goto out_free;
90 			}
91 
92 			error = dlm_rcom_names(ls, memb->nodeid,
93 					       last_name, last_len);
94 			if (error)
95 				goto out_free;
96 
97 			cond_resched();
98 
99 			/*
100 			 * pick namelen/name pairs out of received buffer
101 			 */
102 
103 			b = ls->ls_recover_buf->rc_buf;
104 			left = le16_to_cpu(ls->ls_recover_buf->rc_header.h_length);
105 			left -= sizeof(struct dlm_rcom);
106 
107 			for (;;) {
108 				__be16 v;
109 
110 				error = -EINVAL;
111 				if (left < sizeof(__be16))
112 					goto out_free;
113 
114 				memcpy(&v, b, sizeof(__be16));
115 				namelen = be16_to_cpu(v);
116 				b += sizeof(__be16);
117 				left -= sizeof(__be16);
118 
119 				/* namelen of 0xFFFFF marks end of names for
120 				   this node; namelen of 0 marks end of the
121 				   buffer */
122 
123 				if (namelen == 0xFFFF)
124 					goto done;
125 				if (!namelen)
126 					break;
127 
128 				if (namelen > left)
129 					goto out_free;
130 
131 				if (namelen > DLM_RESNAME_MAXLEN)
132 					goto out_free;
133 
134 				error = dlm_master_lookup(ls, memb->nodeid,
135 							  b, namelen,
136 							  DLM_LU_RECOVER_DIR,
137 							  &nodeid, &result);
138 				if (error) {
139 					log_error(ls, "recover_dir lookup %d",
140 						  error);
141 					goto out_free;
142 				}
143 
144 				/* The name was found in rsbtbl, but the
145 				 * master nodeid is different from
146 				 * memb->nodeid which says it is the master.
147 				 * This should not happen. */
148 
149 				if (result == DLM_LU_MATCH &&
150 				    nodeid != memb->nodeid) {
151 					count_bad++;
152 					log_error(ls, "recover_dir lookup %d "
153 						  "nodeid %d memb %d bad %u",
154 						  result, nodeid, memb->nodeid,
155 						  count_bad);
156 					print_hex_dump_bytes("dlm_recover_dir ",
157 							     DUMP_PREFIX_NONE,
158 							     b, namelen);
159 				}
160 
161 				/* The name was found in rsbtbl, and the
162 				 * master nodeid matches memb->nodeid. */
163 
164 				if (result == DLM_LU_MATCH &&
165 				    nodeid == memb->nodeid) {
166 					count_match++;
167 				}
168 
169 				/* The name was not found in rsbtbl and was
170 				 * added with memb->nodeid as the master. */
171 
172 				if (result == DLM_LU_ADD) {
173 					count_add++;
174 				}
175 
176 				last_len = namelen;
177 				memcpy(last_name, b, namelen);
178 				b += namelen;
179 				left -= namelen;
180 				count++;
181 			}
182 		}
183 	 done:
184 		;
185 	}
186 
187  out_status:
188 	error = 0;
189 	dlm_set_recover_status(ls, DLM_RS_DIR);
190 
191 	log_rinfo(ls, "dlm_recover_directory %u in %u new",
192 		  count, count_add);
193  out_free:
194 	kfree(last_name);
195  out:
196 	return error;
197 }
198 
find_rsb_root(struct dlm_ls * ls,char * name,int len)199 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
200 {
201 	struct dlm_rsb *r;
202 	uint32_t hash, bucket;
203 	int rv;
204 
205 	hash = jhash(name, len, 0);
206 	bucket = hash & (ls->ls_rsbtbl_size - 1);
207 
208 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
209 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
210 	if (rv)
211 		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
212 					 name, len, &r);
213 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
214 
215 	if (!rv)
216 		return r;
217 
218 	down_read(&ls->ls_root_sem);
219 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
220 		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
221 			up_read(&ls->ls_root_sem);
222 			log_debug(ls, "find_rsb_root revert to root_list %s",
223 				  r->res_name);
224 			return r;
225 		}
226 	}
227 	up_read(&ls->ls_root_sem);
228 	return NULL;
229 }
230 
231 /* Find the rsb where we left off (or start again), then send rsb names
232    for rsb's we're master of and whose directory node matches the requesting
233    node.  inbuf is the rsb name last sent, inlen is the name's length */
234 
dlm_copy_master_names(struct dlm_ls * ls,char * inbuf,int inlen,char * outbuf,int outlen,int nodeid)235 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
236  			   char *outbuf, int outlen, int nodeid)
237 {
238 	struct list_head *list;
239 	struct dlm_rsb *r;
240 	int offset = 0, dir_nodeid;
241 	__be16 be_namelen;
242 
243 	down_read(&ls->ls_root_sem);
244 
245 	if (inlen > 1) {
246 		r = find_rsb_root(ls, inbuf, inlen);
247 		if (!r) {
248 			inbuf[inlen - 1] = '\0';
249 			log_error(ls, "copy_master_names from %d start %d %s",
250 				  nodeid, inlen, inbuf);
251 			goto out;
252 		}
253 		list = r->res_root_list.next;
254 	} else {
255 		list = ls->ls_root_list.next;
256 	}
257 
258 	for (offset = 0; list != &ls->ls_root_list; list = list->next) {
259 		r = list_entry(list, struct dlm_rsb, res_root_list);
260 		if (r->res_nodeid)
261 			continue;
262 
263 		dir_nodeid = dlm_dir_nodeid(r);
264 		if (dir_nodeid != nodeid)
265 			continue;
266 
267 		/*
268 		 * The block ends when we can't fit the following in the
269 		 * remaining buffer space:
270 		 * namelen (uint16_t) +
271 		 * name (r->res_length) +
272 		 * end-of-block record 0x0000 (uint16_t)
273 		 */
274 
275 		if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
276 			/* Write end-of-block record */
277 			be_namelen = cpu_to_be16(0);
278 			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
279 			offset += sizeof(__be16);
280 			ls->ls_recover_dir_sent_msg++;
281 			goto out;
282 		}
283 
284 		be_namelen = cpu_to_be16(r->res_length);
285 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
286 		offset += sizeof(__be16);
287 		memcpy(outbuf + offset, r->res_name, r->res_length);
288 		offset += r->res_length;
289 		ls->ls_recover_dir_sent_res++;
290 	}
291 
292 	/*
293 	 * If we've reached the end of the list (and there's room) write a
294 	 * terminating record.
295 	 */
296 
297 	if ((list == &ls->ls_root_list) &&
298 	    (offset + sizeof(uint16_t) <= outlen)) {
299 		be_namelen = cpu_to_be16(0xFFFF);
300 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
301 		offset += sizeof(__be16);
302 		ls->ls_recover_dir_sent_msg++;
303 	}
304  out:
305 	up_read(&ls->ls_root_sem);
306 }
307 
308