1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40 
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42 static int w_make_resync_request(struct drbd_conf *mdev,
43 				 struct drbd_work *w, int cancel);
44 
45 
46 
47 /* endio handlers:
48  *   drbd_md_io_complete (defined here)
49  *   drbd_endio_pri (defined here)
50  *   drbd_endio_sec (defined here)
51  *   bm_async_io_complete (defined in drbd_bitmap.c)
52  *
53  * For all these callbacks, note the following:
54  * The callbacks will be called in irq context by the IDE drivers,
55  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
56  * Try to get the locking right :)
57  *
58  */
59 
60 
61 /* About the global_state_lock
62    Each state transition on an device holds a read lock. In case we have
63    to evaluate the sync after dependencies, we grab a write lock, because
64    we need stable states on all devices for that.  */
65 rwlock_t global_state_lock;
66 
67 /* used for synchronous meta data and bitmap IO
68  * submitted by drbd_md_sync_page_io()
69  */
drbd_md_io_complete(struct bio * bio,int error)70 void drbd_md_io_complete(struct bio *bio, int error)
71 {
72 	struct drbd_md_io *md_io;
73 
74 	md_io = (struct drbd_md_io *)bio->bi_private;
75 	md_io->error = error;
76 
77 	complete(&md_io->event);
78 }
79 
80 /* reads on behalf of the partner,
81  * "submitted" by the receiver
82  */
drbd_endio_read_sec_final(struct drbd_epoch_entry * e)83 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
84 {
85 	unsigned long flags = 0;
86 	struct drbd_conf *mdev = e->mdev;
87 
88 	D_ASSERT(e->block_id != ID_VACANT);
89 
90 	spin_lock_irqsave(&mdev->req_lock, flags);
91 	mdev->read_cnt += e->size >> 9;
92 	list_del(&e->w.list);
93 	if (list_empty(&mdev->read_ee))
94 		wake_up(&mdev->ee_wait);
95 	if (test_bit(__EE_WAS_ERROR, &e->flags))
96 		__drbd_chk_io_error(mdev, false);
97 	spin_unlock_irqrestore(&mdev->req_lock, flags);
98 
99 	drbd_queue_work(&mdev->data.work, &e->w);
100 	put_ldev(mdev);
101 }
102 
103 /* writes on behalf of the partner, or resync writes,
104  * "submitted" by the receiver, final stage.  */
drbd_endio_write_sec_final(struct drbd_epoch_entry * e)105 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
106 {
107 	unsigned long flags = 0;
108 	struct drbd_conf *mdev = e->mdev;
109 	sector_t e_sector;
110 	int do_wake;
111 	int is_syncer_req;
112 	int do_al_complete_io;
113 
114 	D_ASSERT(e->block_id != ID_VACANT);
115 
116 	/* after we moved e to done_ee,
117 	 * we may no longer access it,
118 	 * it may be freed/reused already!
119 	 * (as soon as we release the req_lock) */
120 	e_sector = e->sector;
121 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
122 	is_syncer_req = is_syncer_block_id(e->block_id);
123 
124 	spin_lock_irqsave(&mdev->req_lock, flags);
125 	mdev->writ_cnt += e->size >> 9;
126 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
127 	list_add_tail(&e->w.list, &mdev->done_ee);
128 
129 	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
130 	 * neither did we wake possibly waiting conflicting requests.
131 	 * done from "drbd_process_done_ee" within the appropriate w.cb
132 	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
133 
134 	do_wake = is_syncer_req
135 		? list_empty(&mdev->sync_ee)
136 		: list_empty(&mdev->active_ee);
137 
138 	if (test_bit(__EE_WAS_ERROR, &e->flags))
139 		__drbd_chk_io_error(mdev, false);
140 	spin_unlock_irqrestore(&mdev->req_lock, flags);
141 
142 	if (is_syncer_req)
143 		drbd_rs_complete_io(mdev, e_sector);
144 
145 	if (do_wake)
146 		wake_up(&mdev->ee_wait);
147 
148 	if (do_al_complete_io)
149 		drbd_al_complete_io(mdev, e_sector);
150 
151 	wake_asender(mdev);
152 	put_ldev(mdev);
153 }
154 
155 /* writes on behalf of the partner, or resync writes,
156  * "submitted" by the receiver.
157  */
drbd_endio_sec(struct bio * bio,int error)158 void drbd_endio_sec(struct bio *bio, int error)
159 {
160 	struct drbd_epoch_entry *e = bio->bi_private;
161 	struct drbd_conf *mdev = e->mdev;
162 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
163 	int is_write = bio_data_dir(bio) == WRITE;
164 
165 	if (error && __ratelimit(&drbd_ratelimit_state))
166 		dev_warn(DEV, "%s: error=%d s=%llus\n",
167 				is_write ? "write" : "read", error,
168 				(unsigned long long)e->sector);
169 	if (!error && !uptodate) {
170 		if (__ratelimit(&drbd_ratelimit_state))
171 			dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
172 					is_write ? "write" : "read",
173 					(unsigned long long)e->sector);
174 		/* strange behavior of some lower level drivers...
175 		 * fail the request by clearing the uptodate flag,
176 		 * but do not return any error?! */
177 		error = -EIO;
178 	}
179 
180 	if (error)
181 		set_bit(__EE_WAS_ERROR, &e->flags);
182 
183 	bio_put(bio); /* no need for the bio anymore */
184 	if (atomic_dec_and_test(&e->pending_bios)) {
185 		if (is_write)
186 			drbd_endio_write_sec_final(e);
187 		else
188 			drbd_endio_read_sec_final(e);
189 	}
190 }
191 
192 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
193  */
drbd_endio_pri(struct bio * bio,int error)194 void drbd_endio_pri(struct bio *bio, int error)
195 {
196 	unsigned long flags;
197 	struct drbd_request *req = bio->bi_private;
198 	struct drbd_conf *mdev = req->mdev;
199 	struct bio_and_error m;
200 	enum drbd_req_event what;
201 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
202 
203 	if (!error && !uptodate) {
204 		dev_warn(DEV, "p %s: setting error to -EIO\n",
205 			 bio_data_dir(bio) == WRITE ? "write" : "read");
206 		/* strange behavior of some lower level drivers...
207 		 * fail the request by clearing the uptodate flag,
208 		 * but do not return any error?! */
209 		error = -EIO;
210 	}
211 
212 	/* to avoid recursion in __req_mod */
213 	if (unlikely(error)) {
214 		what = (bio_data_dir(bio) == WRITE)
215 			? write_completed_with_error
216 			: (bio_rw(bio) == READ)
217 			  ? read_completed_with_error
218 			  : read_ahead_completed_with_error;
219 	} else
220 		what = completed_ok;
221 
222 	bio_put(req->private_bio);
223 	req->private_bio = ERR_PTR(error);
224 
225 	/* not req_mod(), we need irqsave here! */
226 	spin_lock_irqsave(&mdev->req_lock, flags);
227 	__req_mod(req, what, &m);
228 	spin_unlock_irqrestore(&mdev->req_lock, flags);
229 
230 	if (m.bio)
231 		complete_master_bio(mdev, &m);
232 }
233 
w_read_retry_remote(struct drbd_conf * mdev,struct drbd_work * w,int cancel)234 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
235 {
236 	struct drbd_request *req = container_of(w, struct drbd_request, w);
237 
238 	/* We should not detach for read io-error,
239 	 * but try to WRITE the P_DATA_REPLY to the failed location,
240 	 * to give the disk the chance to relocate that block */
241 
242 	spin_lock_irq(&mdev->req_lock);
243 	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
244 		_req_mod(req, read_retry_remote_canceled);
245 		spin_unlock_irq(&mdev->req_lock);
246 		return 1;
247 	}
248 	spin_unlock_irq(&mdev->req_lock);
249 
250 	return w_send_read_req(mdev, w, 0);
251 }
252 
drbd_csum_ee(struct drbd_conf * mdev,struct crypto_hash * tfm,struct drbd_epoch_entry * e,void * digest)253 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
254 {
255 	struct hash_desc desc;
256 	struct scatterlist sg;
257 	struct page *page = e->pages;
258 	struct page *tmp;
259 	unsigned len;
260 
261 	desc.tfm = tfm;
262 	desc.flags = 0;
263 
264 	sg_init_table(&sg, 1);
265 	crypto_hash_init(&desc);
266 
267 	while ((tmp = page_chain_next(page))) {
268 		/* all but the last page will be fully used */
269 		sg_set_page(&sg, page, PAGE_SIZE, 0);
270 		crypto_hash_update(&desc, &sg, sg.length);
271 		page = tmp;
272 	}
273 	/* and now the last, possibly only partially used page */
274 	len = e->size & (PAGE_SIZE - 1);
275 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
276 	crypto_hash_update(&desc, &sg, sg.length);
277 	crypto_hash_final(&desc, digest);
278 }
279 
drbd_csum_bio(struct drbd_conf * mdev,struct crypto_hash * tfm,struct bio * bio,void * digest)280 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
281 {
282 	struct hash_desc desc;
283 	struct scatterlist sg;
284 	struct bio_vec *bvec;
285 	int i;
286 
287 	desc.tfm = tfm;
288 	desc.flags = 0;
289 
290 	sg_init_table(&sg, 1);
291 	crypto_hash_init(&desc);
292 
293 	__bio_for_each_segment(bvec, bio, i, 0) {
294 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
295 		crypto_hash_update(&desc, &sg, sg.length);
296 	}
297 	crypto_hash_final(&desc, digest);
298 }
299 
w_e_send_csum(struct drbd_conf * mdev,struct drbd_work * w,int cancel)300 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
301 {
302 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
303 	int digest_size;
304 	void *digest;
305 	int ok;
306 
307 	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
308 
309 	if (unlikely(cancel)) {
310 		drbd_free_ee(mdev, e);
311 		return 1;
312 	}
313 
314 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
315 		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
316 		digest = kmalloc(digest_size, GFP_NOIO);
317 		if (digest) {
318 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
319 
320 			inc_rs_pending(mdev);
321 			ok = drbd_send_drequest_csum(mdev,
322 						     e->sector,
323 						     e->size,
324 						     digest,
325 						     digest_size,
326 						     P_CSUM_RS_REQUEST);
327 			kfree(digest);
328 		} else {
329 			dev_err(DEV, "kmalloc() of digest failed.\n");
330 			ok = 0;
331 		}
332 	} else
333 		ok = 1;
334 
335 	drbd_free_ee(mdev, e);
336 
337 	if (unlikely(!ok))
338 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
339 	return ok;
340 }
341 
342 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
343 
read_for_csum(struct drbd_conf * mdev,sector_t sector,int size)344 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
345 {
346 	struct drbd_epoch_entry *e;
347 
348 	if (!get_ldev(mdev))
349 		return -EIO;
350 
351 	if (drbd_rs_should_slow_down(mdev, sector))
352 		goto defer;
353 
354 	/* GFP_TRY, because if there is no memory available right now, this may
355 	 * be rescheduled for later. It is "only" background resync, after all. */
356 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
357 	if (!e)
358 		goto defer;
359 
360 	e->w.cb = w_e_send_csum;
361 	spin_lock_irq(&mdev->req_lock);
362 	list_add(&e->w.list, &mdev->read_ee);
363 	spin_unlock_irq(&mdev->req_lock);
364 
365 	atomic_add(size >> 9, &mdev->rs_sect_ev);
366 	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
367 		return 0;
368 
369 	/* If it failed because of ENOMEM, retry should help.  If it failed
370 	 * because bio_add_page failed (probably broken lower level driver),
371 	 * retry may or may not help.
372 	 * If it does not, you may need to force disconnect. */
373 	spin_lock_irq(&mdev->req_lock);
374 	list_del(&e->w.list);
375 	spin_unlock_irq(&mdev->req_lock);
376 
377 	drbd_free_ee(mdev, e);
378 defer:
379 	put_ldev(mdev);
380 	return -EAGAIN;
381 }
382 
w_resync_timer(struct drbd_conf * mdev,struct drbd_work * w,int cancel)383 int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
384 {
385 	switch (mdev->state.conn) {
386 	case C_VERIFY_S:
387 		w_make_ov_request(mdev, w, cancel);
388 		break;
389 	case C_SYNC_TARGET:
390 		w_make_resync_request(mdev, w, cancel);
391 		break;
392 	}
393 
394 	return 1;
395 }
396 
resync_timer_fn(unsigned long data)397 void resync_timer_fn(unsigned long data)
398 {
399 	struct drbd_conf *mdev = (struct drbd_conf *) data;
400 
401 	if (list_empty(&mdev->resync_work.list))
402 		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
403 }
404 
fifo_set(struct fifo_buffer * fb,int value)405 static void fifo_set(struct fifo_buffer *fb, int value)
406 {
407 	int i;
408 
409 	for (i = 0; i < fb->size; i++)
410 		fb->values[i] = value;
411 }
412 
fifo_push(struct fifo_buffer * fb,int value)413 static int fifo_push(struct fifo_buffer *fb, int value)
414 {
415 	int ov;
416 
417 	ov = fb->values[fb->head_index];
418 	fb->values[fb->head_index++] = value;
419 
420 	if (fb->head_index >= fb->size)
421 		fb->head_index = 0;
422 
423 	return ov;
424 }
425 
fifo_add_val(struct fifo_buffer * fb,int value)426 static void fifo_add_val(struct fifo_buffer *fb, int value)
427 {
428 	int i;
429 
430 	for (i = 0; i < fb->size; i++)
431 		fb->values[i] += value;
432 }
433 
drbd_rs_controller(struct drbd_conf * mdev)434 static int drbd_rs_controller(struct drbd_conf *mdev)
435 {
436 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
437 	unsigned int want;     /* The number of sectors we want in the proxy */
438 	int req_sect; /* Number of sectors to request in this turn */
439 	int correction; /* Number of sectors more we need in the proxy*/
440 	int cps; /* correction per invocation of drbd_rs_controller() */
441 	int steps; /* Number of time steps to plan ahead */
442 	int curr_corr;
443 	int max_sect;
444 
445 	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
446 	mdev->rs_in_flight -= sect_in;
447 
448 	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
449 
450 	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
451 
452 	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
453 		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
454 	} else { /* normal path */
455 		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
456 			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
457 	}
458 
459 	correction = want - mdev->rs_in_flight - mdev->rs_planed;
460 
461 	/* Plan ahead */
462 	cps = correction / steps;
463 	fifo_add_val(&mdev->rs_plan_s, cps);
464 	mdev->rs_planed += cps * steps;
465 
466 	/* What we do in this step */
467 	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
468 	spin_unlock(&mdev->peer_seq_lock);
469 	mdev->rs_planed -= curr_corr;
470 
471 	req_sect = sect_in + curr_corr;
472 	if (req_sect < 0)
473 		req_sect = 0;
474 
475 	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
476 	if (req_sect > max_sect)
477 		req_sect = max_sect;
478 
479 	/*
480 	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
481 		 sect_in, mdev->rs_in_flight, want, correction,
482 		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
483 	*/
484 
485 	return req_sect;
486 }
487 
drbd_rs_number_requests(struct drbd_conf * mdev)488 static int drbd_rs_number_requests(struct drbd_conf *mdev)
489 {
490 	int number;
491 	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
492 		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
493 		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
494 	} else {
495 		mdev->c_sync_rate = mdev->sync_conf.rate;
496 		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
497 	}
498 
499 	/* ignore the amount of pending requests, the resync controller should
500 	 * throttle down to incoming reply rate soon enough anyways. */
501 	return number;
502 }
503 
w_make_resync_request(struct drbd_conf * mdev,struct drbd_work * w,int cancel)504 static int w_make_resync_request(struct drbd_conf *mdev,
505 				 struct drbd_work *w, int cancel)
506 {
507 	unsigned long bit;
508 	sector_t sector;
509 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
510 	int max_bio_size;
511 	int number, rollback_i, size;
512 	int align, queued, sndbuf;
513 	int i = 0;
514 
515 	if (unlikely(cancel))
516 		return 1;
517 
518 	if (mdev->rs_total == 0) {
519 		/* empty resync? */
520 		drbd_resync_finished(mdev);
521 		return 1;
522 	}
523 
524 	if (!get_ldev(mdev)) {
525 		/* Since we only need to access mdev->rsync a
526 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
527 		   to continue resync with a broken disk makes no sense at
528 		   all */
529 		dev_err(DEV, "Disk broke down during resync!\n");
530 		return 1;
531 	}
532 
533 	/* starting with drbd 8.3.8, we can handle multi-bio EEs,
534 	 * if it should be necessary */
535 	max_bio_size =
536 		mdev->agreed_pro_version < 94 ? queue_max_hw_sectors(mdev->rq_queue) << 9 :
537 		mdev->agreed_pro_version < 95 ?	DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_BIO_SIZE;
538 
539 	number = drbd_rs_number_requests(mdev);
540 	if (number == 0)
541 		goto requeue;
542 
543 	for (i = 0; i < number; i++) {
544 		/* Stop generating RS requests, when half of the send buffer is filled */
545 		mutex_lock(&mdev->data.mutex);
546 		if (mdev->data.socket) {
547 			queued = mdev->data.socket->sk->sk_wmem_queued;
548 			sndbuf = mdev->data.socket->sk->sk_sndbuf;
549 		} else {
550 			queued = 1;
551 			sndbuf = 0;
552 		}
553 		mutex_unlock(&mdev->data.mutex);
554 		if (queued > sndbuf / 2)
555 			goto requeue;
556 
557 next_sector:
558 		size = BM_BLOCK_SIZE;
559 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
560 
561 		if (bit == DRBD_END_OF_BITMAP) {
562 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
563 			put_ldev(mdev);
564 			return 1;
565 		}
566 
567 		sector = BM_BIT_TO_SECT(bit);
568 
569 		if (drbd_rs_should_slow_down(mdev, sector) ||
570 		    drbd_try_rs_begin_io(mdev, sector)) {
571 			mdev->bm_resync_fo = bit;
572 			goto requeue;
573 		}
574 		mdev->bm_resync_fo = bit + 1;
575 
576 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
577 			drbd_rs_complete_io(mdev, sector);
578 			goto next_sector;
579 		}
580 
581 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
582 		/* try to find some adjacent bits.
583 		 * we stop if we have already the maximum req size.
584 		 *
585 		 * Additionally always align bigger requests, in order to
586 		 * be prepared for all stripe sizes of software RAIDs.
587 		 */
588 		align = 1;
589 		rollback_i = i;
590 		for (;;) {
591 			if (size + BM_BLOCK_SIZE > max_bio_size)
592 				break;
593 
594 			/* Be always aligned */
595 			if (sector & ((1<<(align+3))-1))
596 				break;
597 
598 			/* do not cross extent boundaries */
599 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
600 				break;
601 			/* now, is it actually dirty, after all?
602 			 * caution, drbd_bm_test_bit is tri-state for some
603 			 * obscure reason; ( b == 0 ) would get the out-of-band
604 			 * only accidentally right because of the "oddly sized"
605 			 * adjustment below */
606 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
607 				break;
608 			bit++;
609 			size += BM_BLOCK_SIZE;
610 			if ((BM_BLOCK_SIZE << align) <= size)
611 				align++;
612 			i++;
613 		}
614 		/* if we merged some,
615 		 * reset the offset to start the next drbd_bm_find_next from */
616 		if (size > BM_BLOCK_SIZE)
617 			mdev->bm_resync_fo = bit + 1;
618 #endif
619 
620 		/* adjust very last sectors, in case we are oddly sized */
621 		if (sector + (size>>9) > capacity)
622 			size = (capacity-sector)<<9;
623 		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
624 			switch (read_for_csum(mdev, sector, size)) {
625 			case -EIO: /* Disk failure */
626 				put_ldev(mdev);
627 				return 0;
628 			case -EAGAIN: /* allocation failed, or ldev busy */
629 				drbd_rs_complete_io(mdev, sector);
630 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
631 				i = rollback_i;
632 				goto requeue;
633 			case 0:
634 				/* everything ok */
635 				break;
636 			default:
637 				BUG();
638 			}
639 		} else {
640 			inc_rs_pending(mdev);
641 			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
642 					       sector, size, ID_SYNCER)) {
643 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
644 				dec_rs_pending(mdev);
645 				put_ldev(mdev);
646 				return 0;
647 			}
648 		}
649 	}
650 
651 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
652 		/* last syncer _request_ was sent,
653 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
654 		 * next sync group will resume), as soon as we receive the last
655 		 * resync data block, and the last bit is cleared.
656 		 * until then resync "work" is "inactive" ...
657 		 */
658 		put_ldev(mdev);
659 		return 1;
660 	}
661 
662  requeue:
663 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
664 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
665 	put_ldev(mdev);
666 	return 1;
667 }
668 
w_make_ov_request(struct drbd_conf * mdev,struct drbd_work * w,int cancel)669 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
670 {
671 	int number, i, size;
672 	sector_t sector;
673 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
674 
675 	if (unlikely(cancel))
676 		return 1;
677 
678 	number = drbd_rs_number_requests(mdev);
679 
680 	sector = mdev->ov_position;
681 	for (i = 0; i < number; i++) {
682 		if (sector >= capacity) {
683 			return 1;
684 		}
685 
686 		size = BM_BLOCK_SIZE;
687 
688 		if (drbd_rs_should_slow_down(mdev, sector) ||
689 		    drbd_try_rs_begin_io(mdev, sector)) {
690 			mdev->ov_position = sector;
691 			goto requeue;
692 		}
693 
694 		if (sector + (size>>9) > capacity)
695 			size = (capacity-sector)<<9;
696 
697 		inc_rs_pending(mdev);
698 		if (!drbd_send_ov_request(mdev, sector, size)) {
699 			dec_rs_pending(mdev);
700 			return 0;
701 		}
702 		sector += BM_SECT_PER_BIT;
703 	}
704 	mdev->ov_position = sector;
705 
706  requeue:
707 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
708 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
709 	return 1;
710 }
711 
712 
start_resync_timer_fn(unsigned long data)713 void start_resync_timer_fn(unsigned long data)
714 {
715 	struct drbd_conf *mdev = (struct drbd_conf *) data;
716 
717 	drbd_queue_work(&mdev->data.work, &mdev->start_resync_work);
718 }
719 
w_start_resync(struct drbd_conf * mdev,struct drbd_work * w,int cancel)720 int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
721 {
722 	if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
723 		dev_warn(DEV, "w_start_resync later...\n");
724 		mdev->start_resync_timer.expires = jiffies + HZ/10;
725 		add_timer(&mdev->start_resync_timer);
726 		return 1;
727 	}
728 
729 	drbd_start_resync(mdev, C_SYNC_SOURCE);
730 	clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
731 	return 1;
732 }
733 
w_ov_finished(struct drbd_conf * mdev,struct drbd_work * w,int cancel)734 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
735 {
736 	kfree(w);
737 	ov_oos_print(mdev);
738 	drbd_resync_finished(mdev);
739 
740 	return 1;
741 }
742 
w_resync_finished(struct drbd_conf * mdev,struct drbd_work * w,int cancel)743 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
744 {
745 	kfree(w);
746 
747 	drbd_resync_finished(mdev);
748 
749 	return 1;
750 }
751 
ping_peer(struct drbd_conf * mdev)752 static void ping_peer(struct drbd_conf *mdev)
753 {
754 	clear_bit(GOT_PING_ACK, &mdev->flags);
755 	request_ping(mdev);
756 	wait_event(mdev->misc_wait,
757 		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
758 }
759 
drbd_resync_finished(struct drbd_conf * mdev)760 int drbd_resync_finished(struct drbd_conf *mdev)
761 {
762 	unsigned long db, dt, dbdt;
763 	unsigned long n_oos;
764 	union drbd_state os, ns;
765 	struct drbd_work *w;
766 	char *khelper_cmd = NULL;
767 	int verify_done = 0;
768 
769 	/* Remove all elements from the resync LRU. Since future actions
770 	 * might set bits in the (main) bitmap, then the entries in the
771 	 * resync LRU would be wrong. */
772 	if (drbd_rs_del_all(mdev)) {
773 		/* In case this is not possible now, most probably because
774 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
775 		 * queue (or even the read operations for those packets
776 		 * is not finished by now).   Retry in 100ms. */
777 
778 		schedule_timeout_interruptible(HZ / 10);
779 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
780 		if (w) {
781 			w->cb = w_resync_finished;
782 			drbd_queue_work(&mdev->data.work, w);
783 			return 1;
784 		}
785 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
786 	}
787 
788 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
789 	if (dt <= 0)
790 		dt = 1;
791 	db = mdev->rs_total;
792 	dbdt = Bit2KB(db/dt);
793 	mdev->rs_paused /= HZ;
794 
795 	if (!get_ldev(mdev))
796 		goto out;
797 
798 	ping_peer(mdev);
799 
800 	spin_lock_irq(&mdev->req_lock);
801 	os = mdev->state;
802 
803 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
804 
805 	/* This protects us against multiple calls (that can happen in the presence
806 	   of application IO), and against connectivity loss just before we arrive here. */
807 	if (os.conn <= C_CONNECTED)
808 		goto out_unlock;
809 
810 	ns = os;
811 	ns.conn = C_CONNECTED;
812 
813 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
814 	     verify_done ? "Online verify " : "Resync",
815 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
816 
817 	n_oos = drbd_bm_total_weight(mdev);
818 
819 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
820 		if (n_oos) {
821 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
822 			      n_oos, Bit2KB(1));
823 			khelper_cmd = "out-of-sync";
824 		}
825 	} else {
826 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
827 
828 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
829 			khelper_cmd = "after-resync-target";
830 
831 		if (mdev->csums_tfm && mdev->rs_total) {
832 			const unsigned long s = mdev->rs_same_csum;
833 			const unsigned long t = mdev->rs_total;
834 			const int ratio =
835 				(t == 0)     ? 0 :
836 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
837 			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
838 			     "transferred %luK total %luK\n",
839 			     ratio,
840 			     Bit2KB(mdev->rs_same_csum),
841 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
842 			     Bit2KB(mdev->rs_total));
843 		}
844 	}
845 
846 	if (mdev->rs_failed) {
847 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
848 
849 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
850 			ns.disk = D_INCONSISTENT;
851 			ns.pdsk = D_UP_TO_DATE;
852 		} else {
853 			ns.disk = D_UP_TO_DATE;
854 			ns.pdsk = D_INCONSISTENT;
855 		}
856 	} else {
857 		ns.disk = D_UP_TO_DATE;
858 		ns.pdsk = D_UP_TO_DATE;
859 
860 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
861 			if (mdev->p_uuid) {
862 				int i;
863 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
864 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
865 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
866 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
867 			} else {
868 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
869 			}
870 		}
871 
872 		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
873 			/* for verify runs, we don't update uuids here,
874 			 * so there would be nothing to report. */
875 			drbd_uuid_set_bm(mdev, 0UL);
876 			drbd_print_uuids(mdev, "updated UUIDs");
877 			if (mdev->p_uuid) {
878 				/* Now the two UUID sets are equal, update what we
879 				 * know of the peer. */
880 				int i;
881 				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
882 					mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
883 			}
884 		}
885 	}
886 
887 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
888 out_unlock:
889 	spin_unlock_irq(&mdev->req_lock);
890 	put_ldev(mdev);
891 out:
892 	mdev->rs_total  = 0;
893 	mdev->rs_failed = 0;
894 	mdev->rs_paused = 0;
895 	if (verify_done)
896 		mdev->ov_start_sector = 0;
897 
898 	drbd_md_sync(mdev);
899 
900 	if (khelper_cmd)
901 		drbd_khelper(mdev, khelper_cmd);
902 
903 	return 1;
904 }
905 
906 /* helper */
move_to_net_ee_or_free(struct drbd_conf * mdev,struct drbd_epoch_entry * e)907 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
908 {
909 	if (drbd_ee_has_active_page(e)) {
910 		/* This might happen if sendpage() has not finished */
911 		int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
912 		atomic_add(i, &mdev->pp_in_use_by_net);
913 		atomic_sub(i, &mdev->pp_in_use);
914 		spin_lock_irq(&mdev->req_lock);
915 		list_add_tail(&e->w.list, &mdev->net_ee);
916 		spin_unlock_irq(&mdev->req_lock);
917 		wake_up(&drbd_pp_wait);
918 	} else
919 		drbd_free_ee(mdev, e);
920 }
921 
922 /**
923  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
924  * @mdev:	DRBD device.
925  * @w:		work object.
926  * @cancel:	The connection will be closed anyways
927  */
w_e_end_data_req(struct drbd_conf * mdev,struct drbd_work * w,int cancel)928 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
929 {
930 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
931 	int ok;
932 
933 	if (unlikely(cancel)) {
934 		drbd_free_ee(mdev, e);
935 		dec_unacked(mdev);
936 		return 1;
937 	}
938 
939 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
940 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
941 	} else {
942 		if (__ratelimit(&drbd_ratelimit_state))
943 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
944 			    (unsigned long long)e->sector);
945 
946 		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
947 	}
948 
949 	dec_unacked(mdev);
950 
951 	move_to_net_ee_or_free(mdev, e);
952 
953 	if (unlikely(!ok))
954 		dev_err(DEV, "drbd_send_block() failed\n");
955 	return ok;
956 }
957 
958 /**
959  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
960  * @mdev:	DRBD device.
961  * @w:		work object.
962  * @cancel:	The connection will be closed anyways
963  */
w_e_end_rsdata_req(struct drbd_conf * mdev,struct drbd_work * w,int cancel)964 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
965 {
966 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
967 	int ok;
968 
969 	if (unlikely(cancel)) {
970 		drbd_free_ee(mdev, e);
971 		dec_unacked(mdev);
972 		return 1;
973 	}
974 
975 	if (get_ldev_if_state(mdev, D_FAILED)) {
976 		drbd_rs_complete_io(mdev, e->sector);
977 		put_ldev(mdev);
978 	}
979 
980 	if (mdev->state.conn == C_AHEAD) {
981 		ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
982 	} else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
983 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
984 			inc_rs_pending(mdev);
985 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
986 		} else {
987 			if (__ratelimit(&drbd_ratelimit_state))
988 				dev_err(DEV, "Not sending RSDataReply, "
989 				    "partner DISKLESS!\n");
990 			ok = 1;
991 		}
992 	} else {
993 		if (__ratelimit(&drbd_ratelimit_state))
994 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
995 			    (unsigned long long)e->sector);
996 
997 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
998 
999 		/* update resync data with failure */
1000 		drbd_rs_failed_io(mdev, e->sector, e->size);
1001 	}
1002 
1003 	dec_unacked(mdev);
1004 
1005 	move_to_net_ee_or_free(mdev, e);
1006 
1007 	if (unlikely(!ok))
1008 		dev_err(DEV, "drbd_send_block() failed\n");
1009 	return ok;
1010 }
1011 
w_e_end_csum_rs_req(struct drbd_conf * mdev,struct drbd_work * w,int cancel)1012 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1013 {
1014 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1015 	struct digest_info *di;
1016 	int digest_size;
1017 	void *digest = NULL;
1018 	int ok, eq = 0;
1019 
1020 	if (unlikely(cancel)) {
1021 		drbd_free_ee(mdev, e);
1022 		dec_unacked(mdev);
1023 		return 1;
1024 	}
1025 
1026 	if (get_ldev(mdev)) {
1027 		drbd_rs_complete_io(mdev, e->sector);
1028 		put_ldev(mdev);
1029 	}
1030 
1031 	di = e->digest;
1032 
1033 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1034 		/* quick hack to try to avoid a race against reconfiguration.
1035 		 * a real fix would be much more involved,
1036 		 * introducing more locking mechanisms */
1037 		if (mdev->csums_tfm) {
1038 			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1039 			D_ASSERT(digest_size == di->digest_size);
1040 			digest = kmalloc(digest_size, GFP_NOIO);
1041 		}
1042 		if (digest) {
1043 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1044 			eq = !memcmp(digest, di->digest, digest_size);
1045 			kfree(digest);
1046 		}
1047 
1048 		if (eq) {
1049 			drbd_set_in_sync(mdev, e->sector, e->size);
1050 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1051 			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1052 			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1053 		} else {
1054 			inc_rs_pending(mdev);
1055 			e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1056 			e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1057 			kfree(di);
1058 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1059 		}
1060 	} else {
1061 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1062 		if (__ratelimit(&drbd_ratelimit_state))
1063 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1064 	}
1065 
1066 	dec_unacked(mdev);
1067 	move_to_net_ee_or_free(mdev, e);
1068 
1069 	if (unlikely(!ok))
1070 		dev_err(DEV, "drbd_send_block/ack() failed\n");
1071 	return ok;
1072 }
1073 
w_e_end_ov_req(struct drbd_conf * mdev,struct drbd_work * w,int cancel)1074 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1075 {
1076 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1077 	int digest_size;
1078 	void *digest;
1079 	int ok = 1;
1080 
1081 	if (unlikely(cancel))
1082 		goto out;
1083 
1084 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1085 	digest = kmalloc(digest_size, GFP_NOIO);
1086 	if (!digest) {
1087 		ok = 0;	/* terminate the connection in case the allocation failed */
1088 		goto out;
1089 	}
1090 
1091 	if (likely(!(e->flags & EE_WAS_ERROR)))
1092 		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1093 	else
1094 		memset(digest, 0, digest_size);
1095 
1096 	inc_rs_pending(mdev);
1097 	ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1098 				     digest, digest_size, P_OV_REPLY);
1099 	if (!ok)
1100 		dec_rs_pending(mdev);
1101 	kfree(digest);
1102 
1103 out:
1104 	drbd_free_ee(mdev, e);
1105 	dec_unacked(mdev);
1106 
1107 	return ok;
1108 }
1109 
drbd_ov_oos_found(struct drbd_conf * mdev,sector_t sector,int size)1110 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1111 {
1112 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1113 		mdev->ov_last_oos_size += size>>9;
1114 	} else {
1115 		mdev->ov_last_oos_start = sector;
1116 		mdev->ov_last_oos_size = size>>9;
1117 	}
1118 	drbd_set_out_of_sync(mdev, sector, size);
1119 }
1120 
w_e_end_ov_reply(struct drbd_conf * mdev,struct drbd_work * w,int cancel)1121 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1122 {
1123 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1124 	struct digest_info *di;
1125 	int digest_size;
1126 	void *digest;
1127 	int ok, eq = 0;
1128 
1129 	if (unlikely(cancel)) {
1130 		drbd_free_ee(mdev, e);
1131 		dec_unacked(mdev);
1132 		return 1;
1133 	}
1134 
1135 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1136 	 * the resync lru has been cleaned up already */
1137 	if (get_ldev(mdev)) {
1138 		drbd_rs_complete_io(mdev, e->sector);
1139 		put_ldev(mdev);
1140 	}
1141 
1142 	di = e->digest;
1143 
1144 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1145 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1146 		digest = kmalloc(digest_size, GFP_NOIO);
1147 		if (digest) {
1148 			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1149 
1150 			D_ASSERT(digest_size == di->digest_size);
1151 			eq = !memcmp(digest, di->digest, digest_size);
1152 			kfree(digest);
1153 		}
1154 	}
1155 
1156 	dec_unacked(mdev);
1157 	if (!eq)
1158 		drbd_ov_oos_found(mdev, e->sector, e->size);
1159 	else
1160 		ov_oos_print(mdev);
1161 
1162 	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1163 			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1164 
1165 	drbd_free_ee(mdev, e);
1166 
1167 	--mdev->ov_left;
1168 
1169 	/* let's advance progress step marks only for every other megabyte */
1170 	if ((mdev->ov_left & 0x200) == 0x200)
1171 		drbd_advance_rs_marks(mdev, mdev->ov_left);
1172 
1173 	if (mdev->ov_left == 0) {
1174 		ov_oos_print(mdev);
1175 		drbd_resync_finished(mdev);
1176 	}
1177 
1178 	return ok;
1179 }
1180 
w_prev_work_done(struct drbd_conf * mdev,struct drbd_work * w,int cancel)1181 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1182 {
1183 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1184 	complete(&b->done);
1185 	return 1;
1186 }
1187 
w_send_barrier(struct drbd_conf * mdev,struct drbd_work * w,int cancel)1188 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1189 {
1190 	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1191 	struct p_barrier *p = &mdev->data.sbuf.barrier;
1192 	int ok = 1;
1193 
1194 	/* really avoid racing with tl_clear.  w.cb may have been referenced
1195 	 * just before it was reassigned and re-queued, so double check that.
1196 	 * actually, this race was harmless, since we only try to send the
1197 	 * barrier packet here, and otherwise do nothing with the object.
1198 	 * but compare with the head of w_clear_epoch */
1199 	spin_lock_irq(&mdev->req_lock);
1200 	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1201 		cancel = 1;
1202 	spin_unlock_irq(&mdev->req_lock);
1203 	if (cancel)
1204 		return 1;
1205 
1206 	if (!drbd_get_data_sock(mdev))
1207 		return 0;
1208 	p->barrier = b->br_number;
1209 	/* inc_ap_pending was done where this was queued.
1210 	 * dec_ap_pending will be done in got_BarrierAck
1211 	 * or (on connection loss) in w_clear_epoch.  */
1212 	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1213 				(struct p_header80 *)p, sizeof(*p), 0);
1214 	drbd_put_data_sock(mdev);
1215 
1216 	return ok;
1217 }
1218 
w_send_write_hint(struct drbd_conf * mdev,struct drbd_work * w,int cancel)1219 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1220 {
1221 	if (cancel)
1222 		return 1;
1223 	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1224 }
1225 
w_send_oos(struct drbd_conf * mdev,struct drbd_work * w,int cancel)1226 int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1227 {
1228 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1229 	int ok;
1230 
1231 	if (unlikely(cancel)) {
1232 		req_mod(req, send_canceled);
1233 		return 1;
1234 	}
1235 
1236 	ok = drbd_send_oos(mdev, req);
1237 	req_mod(req, oos_handed_to_network);
1238 
1239 	return ok;
1240 }
1241 
1242 /**
1243  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1244  * @mdev:	DRBD device.
1245  * @w:		work object.
1246  * @cancel:	The connection will be closed anyways
1247  */
w_send_dblock(struct drbd_conf * mdev,struct drbd_work * w,int cancel)1248 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1249 {
1250 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1251 	int ok;
1252 
1253 	if (unlikely(cancel)) {
1254 		req_mod(req, send_canceled);
1255 		return 1;
1256 	}
1257 
1258 	ok = drbd_send_dblock(mdev, req);
1259 	req_mod(req, ok ? handed_over_to_network : send_failed);
1260 
1261 	return ok;
1262 }
1263 
1264 /**
1265  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1266  * @mdev:	DRBD device.
1267  * @w:		work object.
1268  * @cancel:	The connection will be closed anyways
1269  */
w_send_read_req(struct drbd_conf * mdev,struct drbd_work * w,int cancel)1270 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1271 {
1272 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1273 	int ok;
1274 
1275 	if (unlikely(cancel)) {
1276 		req_mod(req, send_canceled);
1277 		return 1;
1278 	}
1279 
1280 	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1281 				(unsigned long)req);
1282 
1283 	if (!ok) {
1284 		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1285 		 * so this is probably redundant */
1286 		if (mdev->state.conn >= C_CONNECTED)
1287 			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1288 	}
1289 	req_mod(req, ok ? handed_over_to_network : send_failed);
1290 
1291 	return ok;
1292 }
1293 
w_restart_disk_io(struct drbd_conf * mdev,struct drbd_work * w,int cancel)1294 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1295 {
1296 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1297 
1298 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1299 		drbd_al_begin_io(mdev, req->sector);
1300 	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1301 	   theoretically. Practically it can not deadlock, since this is
1302 	   only used when unfreezing IOs. All the extents of the requests
1303 	   that made it into the TL are already active */
1304 
1305 	drbd_req_make_private_bio(req, req->master_bio);
1306 	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1307 	generic_make_request(req->private_bio);
1308 
1309 	return 1;
1310 }
1311 
_drbd_may_sync_now(struct drbd_conf * mdev)1312 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1313 {
1314 	struct drbd_conf *odev = mdev;
1315 
1316 	while (1) {
1317 		if (odev->sync_conf.after == -1)
1318 			return 1;
1319 		odev = minor_to_mdev(odev->sync_conf.after);
1320 		ERR_IF(!odev) return 1;
1321 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1322 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1323 		    odev->state.aftr_isp || odev->state.peer_isp ||
1324 		    odev->state.user_isp)
1325 			return 0;
1326 	}
1327 }
1328 
1329 /**
1330  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1331  * @mdev:	DRBD device.
1332  *
1333  * Called from process context only (admin command and after_state_ch).
1334  */
_drbd_pause_after(struct drbd_conf * mdev)1335 static int _drbd_pause_after(struct drbd_conf *mdev)
1336 {
1337 	struct drbd_conf *odev;
1338 	int i, rv = 0;
1339 
1340 	for (i = 0; i < minor_count; i++) {
1341 		odev = minor_to_mdev(i);
1342 		if (!odev)
1343 			continue;
1344 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1345 			continue;
1346 		if (!_drbd_may_sync_now(odev))
1347 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1348 			       != SS_NOTHING_TO_DO);
1349 	}
1350 
1351 	return rv;
1352 }
1353 
1354 /**
1355  * _drbd_resume_next() - Resume resync on all devices that may resync now
1356  * @mdev:	DRBD device.
1357  *
1358  * Called from process context only (admin command and worker).
1359  */
_drbd_resume_next(struct drbd_conf * mdev)1360 static int _drbd_resume_next(struct drbd_conf *mdev)
1361 {
1362 	struct drbd_conf *odev;
1363 	int i, rv = 0;
1364 
1365 	for (i = 0; i < minor_count; i++) {
1366 		odev = minor_to_mdev(i);
1367 		if (!odev)
1368 			continue;
1369 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1370 			continue;
1371 		if (odev->state.aftr_isp) {
1372 			if (_drbd_may_sync_now(odev))
1373 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1374 							CS_HARD, NULL)
1375 				       != SS_NOTHING_TO_DO) ;
1376 		}
1377 	}
1378 	return rv;
1379 }
1380 
resume_next_sg(struct drbd_conf * mdev)1381 void resume_next_sg(struct drbd_conf *mdev)
1382 {
1383 	write_lock_irq(&global_state_lock);
1384 	_drbd_resume_next(mdev);
1385 	write_unlock_irq(&global_state_lock);
1386 }
1387 
suspend_other_sg(struct drbd_conf * mdev)1388 void suspend_other_sg(struct drbd_conf *mdev)
1389 {
1390 	write_lock_irq(&global_state_lock);
1391 	_drbd_pause_after(mdev);
1392 	write_unlock_irq(&global_state_lock);
1393 }
1394 
sync_after_error(struct drbd_conf * mdev,int o_minor)1395 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1396 {
1397 	struct drbd_conf *odev;
1398 
1399 	if (o_minor == -1)
1400 		return NO_ERROR;
1401 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1402 		return ERR_SYNC_AFTER;
1403 
1404 	/* check for loops */
1405 	odev = minor_to_mdev(o_minor);
1406 	while (1) {
1407 		if (odev == mdev)
1408 			return ERR_SYNC_AFTER_CYCLE;
1409 
1410 		/* dependency chain ends here, no cycles. */
1411 		if (odev->sync_conf.after == -1)
1412 			return NO_ERROR;
1413 
1414 		/* follow the dependency chain */
1415 		odev = minor_to_mdev(odev->sync_conf.after);
1416 	}
1417 }
1418 
drbd_alter_sa(struct drbd_conf * mdev,int na)1419 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1420 {
1421 	int changes;
1422 	int retcode;
1423 
1424 	write_lock_irq(&global_state_lock);
1425 	retcode = sync_after_error(mdev, na);
1426 	if (retcode == NO_ERROR) {
1427 		mdev->sync_conf.after = na;
1428 		do {
1429 			changes  = _drbd_pause_after(mdev);
1430 			changes |= _drbd_resume_next(mdev);
1431 		} while (changes);
1432 	}
1433 	write_unlock_irq(&global_state_lock);
1434 	return retcode;
1435 }
1436 
drbd_rs_controller_reset(struct drbd_conf * mdev)1437 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1438 {
1439 	atomic_set(&mdev->rs_sect_in, 0);
1440 	atomic_set(&mdev->rs_sect_ev, 0);
1441 	mdev->rs_in_flight = 0;
1442 	mdev->rs_planed = 0;
1443 	spin_lock(&mdev->peer_seq_lock);
1444 	fifo_set(&mdev->rs_plan_s, 0);
1445 	spin_unlock(&mdev->peer_seq_lock);
1446 }
1447 
1448 /**
1449  * drbd_start_resync() - Start the resync process
1450  * @mdev:	DRBD device.
1451  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1452  *
1453  * This function might bring you directly into one of the
1454  * C_PAUSED_SYNC_* states.
1455  */
drbd_start_resync(struct drbd_conf * mdev,enum drbd_conns side)1456 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1457 {
1458 	union drbd_state ns;
1459 	int r;
1460 
1461 	if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1462 		dev_err(DEV, "Resync already running!\n");
1463 		return;
1464 	}
1465 
1466 	if (mdev->state.conn < C_AHEAD) {
1467 		/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1468 		drbd_rs_cancel_all(mdev);
1469 		/* This should be done when we abort the resync. We definitely do not
1470 		   want to have this for connections going back and forth between
1471 		   Ahead/Behind and SyncSource/SyncTarget */
1472 	}
1473 
1474 	if (side == C_SYNC_TARGET) {
1475 		/* Since application IO was locked out during C_WF_BITMAP_T and
1476 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1477 		   we check that we might make the data inconsistent. */
1478 		r = drbd_khelper(mdev, "before-resync-target");
1479 		r = (r >> 8) & 0xff;
1480 		if (r > 0) {
1481 			dev_info(DEV, "before-resync-target handler returned %d, "
1482 			     "dropping connection.\n", r);
1483 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1484 			return;
1485 		}
1486 	} else /* C_SYNC_SOURCE */ {
1487 		r = drbd_khelper(mdev, "before-resync-source");
1488 		r = (r >> 8) & 0xff;
1489 		if (r > 0) {
1490 			if (r == 3) {
1491 				dev_info(DEV, "before-resync-source handler returned %d, "
1492 					 "ignoring. Old userland tools?", r);
1493 			} else {
1494 				dev_info(DEV, "before-resync-source handler returned %d, "
1495 					 "dropping connection.\n", r);
1496 				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1497 				return;
1498 			}
1499 		}
1500 	}
1501 
1502 	drbd_state_lock(mdev);
1503 
1504 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1505 		drbd_state_unlock(mdev);
1506 		return;
1507 	}
1508 
1509 	write_lock_irq(&global_state_lock);
1510 	ns = mdev->state;
1511 
1512 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1513 
1514 	ns.conn = side;
1515 
1516 	if (side == C_SYNC_TARGET)
1517 		ns.disk = D_INCONSISTENT;
1518 	else /* side == C_SYNC_SOURCE */
1519 		ns.pdsk = D_INCONSISTENT;
1520 
1521 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1522 	ns = mdev->state;
1523 
1524 	if (ns.conn < C_CONNECTED)
1525 		r = SS_UNKNOWN_ERROR;
1526 
1527 	if (r == SS_SUCCESS) {
1528 		unsigned long tw = drbd_bm_total_weight(mdev);
1529 		unsigned long now = jiffies;
1530 		int i;
1531 
1532 		mdev->rs_failed    = 0;
1533 		mdev->rs_paused    = 0;
1534 		mdev->rs_same_csum = 0;
1535 		mdev->rs_last_events = 0;
1536 		mdev->rs_last_sect_ev = 0;
1537 		mdev->rs_total     = tw;
1538 		mdev->rs_start     = now;
1539 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1540 			mdev->rs_mark_left[i] = tw;
1541 			mdev->rs_mark_time[i] = now;
1542 		}
1543 		_drbd_pause_after(mdev);
1544 	}
1545 	write_unlock_irq(&global_state_lock);
1546 
1547 	if (r == SS_SUCCESS) {
1548 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1549 		     drbd_conn_str(ns.conn),
1550 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1551 		     (unsigned long) mdev->rs_total);
1552 		if (side == C_SYNC_TARGET)
1553 			mdev->bm_resync_fo = 0;
1554 
1555 		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1556 		 * with w_send_oos, or the sync target will get confused as to
1557 		 * how much bits to resync.  We cannot do that always, because for an
1558 		 * empty resync and protocol < 95, we need to do it here, as we call
1559 		 * drbd_resync_finished from here in that case.
1560 		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1561 		 * and from after_state_ch otherwise. */
1562 		if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1563 			drbd_gen_and_send_sync_uuid(mdev);
1564 
1565 		if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1566 			/* This still has a race (about when exactly the peers
1567 			 * detect connection loss) that can lead to a full sync
1568 			 * on next handshake. In 8.3.9 we fixed this with explicit
1569 			 * resync-finished notifications, but the fix
1570 			 * introduces a protocol change.  Sleeping for some
1571 			 * time longer than the ping interval + timeout on the
1572 			 * SyncSource, to give the SyncTarget the chance to
1573 			 * detect connection loss, then waiting for a ping
1574 			 * response (implicit in drbd_resync_finished) reduces
1575 			 * the race considerably, but does not solve it. */
1576 			if (side == C_SYNC_SOURCE)
1577 				schedule_timeout_interruptible(
1578 					mdev->net_conf->ping_int * HZ +
1579 					mdev->net_conf->ping_timeo*HZ/9);
1580 			drbd_resync_finished(mdev);
1581 		}
1582 
1583 		drbd_rs_controller_reset(mdev);
1584 		/* ns.conn may already be != mdev->state.conn,
1585 		 * we may have been paused in between, or become paused until
1586 		 * the timer triggers.
1587 		 * No matter, that is handled in resync_timer_fn() */
1588 		if (ns.conn == C_SYNC_TARGET)
1589 			mod_timer(&mdev->resync_timer, jiffies);
1590 
1591 		drbd_md_sync(mdev);
1592 	}
1593 	put_ldev(mdev);
1594 	drbd_state_unlock(mdev);
1595 }
1596 
drbd_worker(struct drbd_thread * thi)1597 int drbd_worker(struct drbd_thread *thi)
1598 {
1599 	struct drbd_conf *mdev = thi->mdev;
1600 	struct drbd_work *w = NULL;
1601 	LIST_HEAD(work_list);
1602 	int intr = 0, i;
1603 
1604 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1605 
1606 	while (get_t_state(thi) == Running) {
1607 		drbd_thread_current_set_cpu(mdev);
1608 
1609 		if (down_trylock(&mdev->data.work.s)) {
1610 			mutex_lock(&mdev->data.mutex);
1611 			if (mdev->data.socket && !mdev->net_conf->no_cork)
1612 				drbd_tcp_uncork(mdev->data.socket);
1613 			mutex_unlock(&mdev->data.mutex);
1614 
1615 			intr = down_interruptible(&mdev->data.work.s);
1616 
1617 			mutex_lock(&mdev->data.mutex);
1618 			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1619 				drbd_tcp_cork(mdev->data.socket);
1620 			mutex_unlock(&mdev->data.mutex);
1621 		}
1622 
1623 		if (intr) {
1624 			D_ASSERT(intr == -EINTR);
1625 			flush_signals(current);
1626 			ERR_IF (get_t_state(thi) == Running)
1627 				continue;
1628 			break;
1629 		}
1630 
1631 		if (get_t_state(thi) != Running)
1632 			break;
1633 		/* With this break, we have done a down() but not consumed
1634 		   the entry from the list. The cleanup code takes care of
1635 		   this...   */
1636 
1637 		w = NULL;
1638 		spin_lock_irq(&mdev->data.work.q_lock);
1639 		ERR_IF(list_empty(&mdev->data.work.q)) {
1640 			/* something terribly wrong in our logic.
1641 			 * we were able to down() the semaphore,
1642 			 * but the list is empty... doh.
1643 			 *
1644 			 * what is the best thing to do now?
1645 			 * try again from scratch, restarting the receiver,
1646 			 * asender, whatnot? could break even more ugly,
1647 			 * e.g. when we are primary, but no good local data.
1648 			 *
1649 			 * I'll try to get away just starting over this loop.
1650 			 */
1651 			spin_unlock_irq(&mdev->data.work.q_lock);
1652 			continue;
1653 		}
1654 		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1655 		list_del_init(&w->list);
1656 		spin_unlock_irq(&mdev->data.work.q_lock);
1657 
1658 		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1659 			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1660 			if (mdev->state.conn >= C_CONNECTED)
1661 				drbd_force_state(mdev,
1662 						NS(conn, C_NETWORK_FAILURE));
1663 		}
1664 	}
1665 	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1666 	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1667 
1668 	spin_lock_irq(&mdev->data.work.q_lock);
1669 	i = 0;
1670 	while (!list_empty(&mdev->data.work.q)) {
1671 		list_splice_init(&mdev->data.work.q, &work_list);
1672 		spin_unlock_irq(&mdev->data.work.q_lock);
1673 
1674 		while (!list_empty(&work_list)) {
1675 			w = list_entry(work_list.next, struct drbd_work, list);
1676 			list_del_init(&w->list);
1677 			w->cb(mdev, w, 1);
1678 			i++; /* dead debugging code */
1679 		}
1680 
1681 		spin_lock_irq(&mdev->data.work.q_lock);
1682 	}
1683 	sema_init(&mdev->data.work.s, 0);
1684 	/* DANGEROUS race: if someone did queue his work within the spinlock,
1685 	 * but up() ed outside the spinlock, we could get an up() on the
1686 	 * semaphore without corresponding list entry.
1687 	 * So don't do that.
1688 	 */
1689 	spin_unlock_irq(&mdev->data.work.q_lock);
1690 
1691 	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1692 	/* _drbd_set_state only uses stop_nowait.
1693 	 * wait here for the Exiting receiver. */
1694 	drbd_thread_stop(&mdev->receiver);
1695 	drbd_mdev_cleanup(mdev);
1696 
1697 	dev_info(DEV, "worker terminated\n");
1698 
1699 	clear_bit(DEVICE_DYING, &mdev->flags);
1700 	clear_bit(CONFIG_PENDING, &mdev->flags);
1701 	wake_up(&mdev->state_wait);
1702 
1703 	return 0;
1704 }
1705