1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001 Cluster File Systems, Inc. <braam@clusterfs.com>
5  * Copyright (C) 2001 Tacit Networks, Inc. <phil@off.net>
6  *
7  *   This file is part of InterMezzo, http://www.inter-mezzo.org.
8  *
9  *   InterMezzo is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   InterMezzo is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with InterMezzo; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * Manage RCVD records for clients in the kernel
23  *
24  */
25 
26 #define __NO_VERSION__
27 #include <linux/module.h>
28 #include <stdarg.h>
29 #include <asm/uaccess.h>
30 
31 #include <linux/errno.h>
32 
33 #include <linux/intermezzo_fs.h>
34 
35 /*
36  * this file contains a hash table of replicators/clients for a
37  * fileset. It allows fast lookup and update of reintegration status
38  */
39 
40 struct izo_offset_rec {
41 	struct list_head or_list;
42 	char             or_uuid[16];
43 	loff_t           or_offset;
44 };
45 
46 #define RCACHE_BITS 8
47 #define RCACHE_SIZE (1 << RCACHE_BITS)
48 #define RCACHE_MASK (RCACHE_SIZE - 1)
49 
50 static struct list_head *
izo_rep_cache(void)51 izo_rep_cache(void)
52 {
53 	int i;
54 	struct list_head *cache;
55 	PRESTO_ALLOC(cache, sizeof(struct list_head) * RCACHE_SIZE);
56 	if (cache == NULL) {
57 		CERROR("intermezzo-fatal: no memory for replicator cache\n");
58                 return NULL;
59 	}
60 	memset(cache, 0, sizeof(struct list_head) * RCACHE_SIZE);
61 	for (i = 0; i < RCACHE_SIZE; i++)
62 		INIT_LIST_HEAD(&cache[i]);
63 
64 	return cache;
65 }
66 
67 static struct list_head *
izo_rep_hash(struct list_head * cache,char * uuid)68 izo_rep_hash(struct list_head *cache, char *uuid)
69 {
70         return &cache[(RCACHE_MASK & uuid[1])];
71 }
72 
73 void
izo_rep_cache_clean(struct presto_file_set * fset)74 izo_rep_cache_clean(struct presto_file_set *fset)
75 {
76 	int i;
77 	struct list_head *bucket;
78 	struct list_head *tmp;
79 
80         if (fset->fset_clients == NULL)
81 		return;
82         for (i = 0; i < RCACHE_SIZE; i++) {
83 
84 				list_for_each_safe(tmp,bucket,&fset->fset_clients[i])
85 				{
86 						struct izo_offset_rec *offrec;
87 						list_del(tmp);
88 						offrec = list_entry(tmp, struct izo_offset_rec,or_list);
89 						PRESTO_FREE(offrec, sizeof(struct izo_offset_rec));
90 				}
91 		}
92 		PRESTO_FREE(fset->fset_clients,sizeof(struct list_head) * RCACHE_SIZE);
93 }
94 
95 struct izo_offset_rec *
izo_rep_cache_find(struct presto_file_set * fset,char * uuid)96 izo_rep_cache_find(struct presto_file_set *fset, char *uuid)
97 {
98 	struct list_head *tmp, *buck = izo_rep_hash(fset->fset_clients, uuid);
99         struct izo_offset_rec *rec = NULL;
100 
101 	list_for_each(tmp, buck) {
102 		rec = list_entry(tmp, struct izo_offset_rec, or_list);
103                 if ( memcmp(rec->or_uuid, uuid, sizeof(rec->or_uuid)) == 0 )
104 			return rec;
105 	}
106 
107 	return NULL;
108 }
109 
110 static int
izo_rep_cache_add(struct presto_file_set * fset,struct izo_rcvd_rec * rec,loff_t offset)111 izo_rep_cache_add(struct presto_file_set *fset, struct izo_rcvd_rec *rec,
112                   loff_t offset)
113 {
114         struct izo_offset_rec *offrec;
115 
116         if (izo_rep_cache_find(fset, rec->lr_uuid)) {
117                 CERROR("izo: duplicate client entry %s off %Ld\n",
118                        fset->fset_name, offset);
119                 return -EINVAL;
120         }
121 
122         PRESTO_ALLOC(offrec, sizeof(*offrec));
123         if (offrec == NULL) {
124                 CERROR("izo: cannot allocate offrec\n");
125                 return -ENOMEM;
126         }
127 
128         memcpy(offrec->or_uuid, rec->lr_uuid, sizeof(rec->lr_uuid));
129         offrec->or_offset = offset;
130 
131         list_add(&offrec->or_list,
132                  izo_rep_hash(fset->fset_clients, rec->lr_uuid));
133         return 0;
134 }
135 
136 int
izo_rep_cache_init(struct presto_file_set * fset)137 izo_rep_cache_init(struct presto_file_set *fset)
138 {
139 	struct izo_rcvd_rec rec;
140         loff_t offset = 0, last_offset = 0;
141 
142 	fset->fset_clients = izo_rep_cache();
143         if (fset->fset_clients == NULL) {
144 		CERROR("Error initializing client cache\n");
145 		return -ENOMEM;
146 	}
147 
148         while ( presto_fread(fset->fset_rcvd.fd_file, (char *)&rec,
149                              sizeof(rec), &offset) == sizeof(rec) ) {
150                 int rc;
151 
152                 if ((rc = izo_rep_cache_add(fset, &rec, last_offset)) < 0) {
153 			izo_rep_cache_clean(fset);
154 			return rc;
155 		}
156 
157                 last_offset = offset;
158 	}
159 
160 	return 0;
161 }
162 
163 /*
164  * Return local last_rcvd record for the client. Update or create
165  * if necessary.
166  *
167  * XXX: After this call, any -EINVAL from izo_rcvd_get is a real error.
168  */
169 int
izo_repstatus(struct presto_file_set * fset,__u64 client_kmlsize,struct izo_rcvd_rec * lr_client,struct izo_rcvd_rec * lr_server)170 izo_repstatus(struct presto_file_set *fset,  __u64 client_kmlsize,
171               struct izo_rcvd_rec *lr_client, struct izo_rcvd_rec *lr_server)
172 {
173         int rc;
174         rc = izo_rcvd_get(lr_server, fset, lr_client->lr_uuid);
175         if (rc < 0 && rc != -EINVAL) {
176                 return rc;
177         }
178 
179         /* client is new or has been reset. */
180         if (rc < 0 || (client_kmlsize == 0 && lr_client->lr_remote_offset == 0)) {
181                 memset(lr_server, 0, sizeof(*lr_server));
182                 memcpy(lr_server->lr_uuid, lr_client->lr_uuid, sizeof(lr_server->lr_uuid));
183                 rc = izo_rcvd_write(fset, lr_server);
184                 if (rc < 0)
185                         return rc;
186         }
187 
188         /* update intersync */
189         rc = izo_upc_repstatus(presto_f2m(fset), fset->fset_name, lr_server);
190         return rc;
191 }
192 
193 loff_t
izo_rcvd_get(struct izo_rcvd_rec * rec,struct presto_file_set * fset,char * uuid)194 izo_rcvd_get(struct izo_rcvd_rec *rec, struct presto_file_set *fset, char *uuid)
195 {
196         struct izo_offset_rec *offrec;
197         struct izo_rcvd_rec tmprec;
198         loff_t offset;
199 
200         offrec = izo_rep_cache_find(fset, uuid);
201         if (offrec == NULL) {
202                 CDEBUG(D_SPECIAL, "izo_get_rcvd: uuid not in hash.\n");
203                 return -EINVAL;
204         }
205         offset = offrec->or_offset;
206 
207         if (rec == NULL)
208                 return offset;
209 
210         if (presto_fread(fset->fset_rcvd.fd_file, (char *)&tmprec,
211                          sizeof(tmprec), &offset) != sizeof(tmprec)) {
212                 CERROR("izo_get_rcvd: Unable to read from last_rcvd file offset "
213                        "%Lu\n", offset);
214                 return -EIO;
215         }
216 
217         memcpy(rec->lr_uuid, tmprec.lr_uuid, sizeof(tmprec.lr_uuid));
218         rec->lr_remote_recno = le64_to_cpu(tmprec.lr_remote_recno);
219         rec->lr_remote_offset = le64_to_cpu(tmprec.lr_remote_offset);
220         rec->lr_local_recno = le64_to_cpu(tmprec.lr_local_recno);
221         rec->lr_local_offset = le64_to_cpu(tmprec.lr_local_offset);
222         rec->lr_last_ctime = le64_to_cpu(tmprec.lr_last_ctime);
223 
224         return offrec->or_offset;
225 }
226 
227 /* Try to lookup the UUID in the hash.  Insert it if it isn't found.  Write the
228  * data to the file.
229  *
230  * Returns the offset of the beginning of the record in the last_rcvd file. */
231 loff_t
izo_rcvd_write(struct presto_file_set * fset,struct izo_rcvd_rec * rec)232 izo_rcvd_write(struct presto_file_set *fset, struct izo_rcvd_rec *rec)
233 {
234         struct izo_offset_rec *offrec;
235         loff_t offset, rc;
236 
237         ENTRY;
238 
239         offrec = izo_rep_cache_find(fset, rec->lr_uuid);
240         if (offrec == NULL) {
241                 /* I don't think it should be possible for an entry to be not in
242                  * the hash table without also having an invalid offset, but we
243                  * handle it gracefully regardless. */
244                 write_lock(&fset->fset_rcvd.fd_lock);
245                 offset = fset->fset_rcvd.fd_offset;
246                 fset->fset_rcvd.fd_offset += sizeof(*rec);
247                 write_unlock(&fset->fset_rcvd.fd_lock);
248 
249                 rc = izo_rep_cache_add(fset, rec, offset);
250                 if (rc < 0) {
251                         EXIT;
252                         return rc;
253                 }
254         } else
255                 offset = offrec->or_offset;
256 
257 
258         rc = presto_fwrite(fset->fset_rcvd.fd_file, (char *)rec, sizeof(*rec),
259                            &offset);
260         if (rc == sizeof(*rec))
261                 /* presto_fwrite() advances 'offset' */
262                 rc = offset - sizeof(*rec);
263 
264         EXIT;
265         return rc;
266 }
267 
268 loff_t
izo_rcvd_upd_remote(struct presto_file_set * fset,char * uuid,__u64 remote_recno,__u64 remote_offset)269 izo_rcvd_upd_remote(struct presto_file_set *fset, char * uuid,  __u64 remote_recno,
270                     __u64 remote_offset)
271 {
272         struct izo_rcvd_rec rec;
273 
274         loff_t rc;
275 
276         ENTRY;
277         rc = izo_rcvd_get(&rec, fset, uuid);
278         if (rc < 0)
279                 return rc;
280         rec.lr_remote_recno = remote_recno;
281         rec.lr_remote_offset = remote_offset;
282 
283         rc = izo_rcvd_write(fset, &rec);
284         EXIT;
285         if (rc < 0)
286                 return rc;
287         return 0;
288 }
289