1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48 
49 #include "drbd_vli.h"
50 
51 enum finish_epoch {
52 	FE_STILL_LIVE,
53 	FE_DESTROYED,
54 	FE_RECYCLED,
55 };
56 
57 static int drbd_do_handshake(struct drbd_conf *mdev);
58 static int drbd_do_auth(struct drbd_conf *mdev);
59 
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
62 
63 
64 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
65 
66 /*
67  * some helper functions to deal with single linked page lists,
68  * page->private being our "next" pointer.
69  */
70 
71 /* If at least n pages are linked at head, get n pages off.
72  * Otherwise, don't modify head, and return NULL.
73  * Locking is the responsibility of the caller.
74  */
page_chain_del(struct page ** head,int n)75 static struct page *page_chain_del(struct page **head, int n)
76 {
77 	struct page *page;
78 	struct page *tmp;
79 
80 	BUG_ON(!n);
81 	BUG_ON(!head);
82 
83 	page = *head;
84 
85 	if (!page)
86 		return NULL;
87 
88 	while (page) {
89 		tmp = page_chain_next(page);
90 		if (--n == 0)
91 			break; /* found sufficient pages */
92 		if (tmp == NULL)
93 			/* insufficient pages, don't use any of them. */
94 			return NULL;
95 		page = tmp;
96 	}
97 
98 	/* add end of list marker for the returned list */
99 	set_page_private(page, 0);
100 	/* actual return value, and adjustment of head */
101 	page = *head;
102 	*head = tmp;
103 	return page;
104 }
105 
106 /* may be used outside of locks to find the tail of a (usually short)
107  * "private" page chain, before adding it back to a global chain head
108  * with page_chain_add() under a spinlock. */
page_chain_tail(struct page * page,int * len)109 static struct page *page_chain_tail(struct page *page, int *len)
110 {
111 	struct page *tmp;
112 	int i = 1;
113 	while ((tmp = page_chain_next(page)))
114 		++i, page = tmp;
115 	if (len)
116 		*len = i;
117 	return page;
118 }
119 
page_chain_free(struct page * page)120 static int page_chain_free(struct page *page)
121 {
122 	struct page *tmp;
123 	int i = 0;
124 	page_chain_for_each_safe(page, tmp) {
125 		put_page(page);
126 		++i;
127 	}
128 	return i;
129 }
130 
page_chain_add(struct page ** head,struct page * chain_first,struct page * chain_last)131 static void page_chain_add(struct page **head,
132 		struct page *chain_first, struct page *chain_last)
133 {
134 #if 1
135 	struct page *tmp;
136 	tmp = page_chain_tail(chain_first, NULL);
137 	BUG_ON(tmp != chain_last);
138 #endif
139 
140 	/* add chain to head */
141 	set_page_private(chain_last, (unsigned long)*head);
142 	*head = chain_first;
143 }
144 
drbd_pp_first_pages_or_try_alloc(struct drbd_conf * mdev,int number)145 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
146 {
147 	struct page *page = NULL;
148 	struct page *tmp = NULL;
149 	int i = 0;
150 
151 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
152 	 * So what. It saves a spin_lock. */
153 	if (drbd_pp_vacant >= number) {
154 		spin_lock(&drbd_pp_lock);
155 		page = page_chain_del(&drbd_pp_pool, number);
156 		if (page)
157 			drbd_pp_vacant -= number;
158 		spin_unlock(&drbd_pp_lock);
159 		if (page)
160 			return page;
161 	}
162 
163 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
165 	 * which in turn might block on the other node at this very place.  */
166 	for (i = 0; i < number; i++) {
167 		tmp = alloc_page(GFP_TRY);
168 		if (!tmp)
169 			break;
170 		set_page_private(tmp, (unsigned long)page);
171 		page = tmp;
172 	}
173 
174 	if (i == number)
175 		return page;
176 
177 	/* Not enough pages immediately available this time.
178 	 * No need to jump around here, drbd_pp_alloc will retry this
179 	 * function "soon". */
180 	if (page) {
181 		tmp = page_chain_tail(page, NULL);
182 		spin_lock(&drbd_pp_lock);
183 		page_chain_add(&drbd_pp_pool, page, tmp);
184 		drbd_pp_vacant += i;
185 		spin_unlock(&drbd_pp_lock);
186 	}
187 	return NULL;
188 }
189 
reclaim_net_ee(struct drbd_conf * mdev,struct list_head * to_be_freed)190 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
191 {
192 	struct drbd_epoch_entry *e;
193 	struct list_head *le, *tle;
194 
195 	/* The EEs are always appended to the end of the list. Since
196 	   they are sent in order over the wire, they have to finish
197 	   in order. As soon as we see the first not finished we can
198 	   stop to examine the list... */
199 
200 	list_for_each_safe(le, tle, &mdev->net_ee) {
201 		e = list_entry(le, struct drbd_epoch_entry, w.list);
202 		if (drbd_ee_has_active_page(e))
203 			break;
204 		list_move(le, to_be_freed);
205 	}
206 }
207 
drbd_kick_lo_and_reclaim_net(struct drbd_conf * mdev)208 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
209 {
210 	LIST_HEAD(reclaimed);
211 	struct drbd_epoch_entry *e, *t;
212 
213 	spin_lock_irq(&mdev->req_lock);
214 	reclaim_net_ee(mdev, &reclaimed);
215 	spin_unlock_irq(&mdev->req_lock);
216 
217 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
218 		drbd_free_net_ee(mdev, e);
219 }
220 
221 /**
222  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
223  * @mdev:	DRBD device.
224  * @number:	number of pages requested
225  * @retry:	whether to retry, if not enough pages are available right now
226  *
227  * Tries to allocate number pages, first from our own page pool, then from
228  * the kernel, unless this allocation would exceed the max_buffers setting.
229  * Possibly retry until DRBD frees sufficient pages somewhere else.
230  *
231  * Returns a page chain linked via page->private.
232  */
drbd_pp_alloc(struct drbd_conf * mdev,unsigned number,bool retry)233 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
234 {
235 	struct page *page = NULL;
236 	DEFINE_WAIT(wait);
237 
238 	/* Yes, we may run up to @number over max_buffers. If we
239 	 * follow it strictly, the admin will get it wrong anyways. */
240 	if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
241 		page = drbd_pp_first_pages_or_try_alloc(mdev, number);
242 
243 	while (page == NULL) {
244 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
245 
246 		drbd_kick_lo_and_reclaim_net(mdev);
247 
248 		if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
249 			page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250 			if (page)
251 				break;
252 		}
253 
254 		if (!retry)
255 			break;
256 
257 		if (signal_pending(current)) {
258 			dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
259 			break;
260 		}
261 
262 		schedule();
263 	}
264 	finish_wait(&drbd_pp_wait, &wait);
265 
266 	if (page)
267 		atomic_add(number, &mdev->pp_in_use);
268 	return page;
269 }
270 
271 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
272  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
273  * Either links the page chain back to the global pool,
274  * or returns all pages to the system. */
drbd_pp_free(struct drbd_conf * mdev,struct page * page,int is_net)275 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
276 {
277 	atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
278 	int i;
279 
280 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
281 		i = page_chain_free(page);
282 	else {
283 		struct page *tmp;
284 		tmp = page_chain_tail(page, &i);
285 		spin_lock(&drbd_pp_lock);
286 		page_chain_add(&drbd_pp_pool, page, tmp);
287 		drbd_pp_vacant += i;
288 		spin_unlock(&drbd_pp_lock);
289 	}
290 	i = atomic_sub_return(i, a);
291 	if (i < 0)
292 		dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
293 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
294 	wake_up(&drbd_pp_wait);
295 }
296 
297 /*
298 You need to hold the req_lock:
299  _drbd_wait_ee_list_empty()
300 
301 You must not have the req_lock:
302  drbd_free_ee()
303  drbd_alloc_ee()
304  drbd_init_ee()
305  drbd_release_ee()
306  drbd_ee_fix_bhs()
307  drbd_process_done_ee()
308  drbd_clear_done_ee()
309  drbd_wait_ee_list_empty()
310 */
311 
drbd_alloc_ee(struct drbd_conf * mdev,u64 id,sector_t sector,unsigned int data_size,gfp_t gfp_mask)312 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
313 				     u64 id,
314 				     sector_t sector,
315 				     unsigned int data_size,
316 				     gfp_t gfp_mask) __must_hold(local)
317 {
318 	struct drbd_epoch_entry *e;
319 	struct page *page;
320 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
321 
322 	if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
323 		return NULL;
324 
325 	e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
326 	if (!e) {
327 		if (!(gfp_mask & __GFP_NOWARN))
328 			dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
329 		return NULL;
330 	}
331 
332 	page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
333 	if (!page)
334 		goto fail;
335 
336 	INIT_HLIST_NODE(&e->colision);
337 	e->epoch = NULL;
338 	e->mdev = mdev;
339 	e->pages = page;
340 	atomic_set(&e->pending_bios, 0);
341 	e->size = data_size;
342 	e->flags = 0;
343 	e->sector = sector;
344 	e->block_id = id;
345 
346 	return e;
347 
348  fail:
349 	mempool_free(e, drbd_ee_mempool);
350 	return NULL;
351 }
352 
drbd_free_some_ee(struct drbd_conf * mdev,struct drbd_epoch_entry * e,int is_net)353 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
354 {
355 	if (e->flags & EE_HAS_DIGEST)
356 		kfree(e->digest);
357 	drbd_pp_free(mdev, e->pages, is_net);
358 	D_ASSERT(atomic_read(&e->pending_bios) == 0);
359 	D_ASSERT(hlist_unhashed(&e->colision));
360 	mempool_free(e, drbd_ee_mempool);
361 }
362 
drbd_release_ee(struct drbd_conf * mdev,struct list_head * list)363 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
364 {
365 	LIST_HEAD(work_list);
366 	struct drbd_epoch_entry *e, *t;
367 	int count = 0;
368 	int is_net = list == &mdev->net_ee;
369 
370 	spin_lock_irq(&mdev->req_lock);
371 	list_splice_init(list, &work_list);
372 	spin_unlock_irq(&mdev->req_lock);
373 
374 	list_for_each_entry_safe(e, t, &work_list, w.list) {
375 		drbd_free_some_ee(mdev, e, is_net);
376 		count++;
377 	}
378 	return count;
379 }
380 
381 
382 /*
383  * This function is called from _asender only_
384  * but see also comments in _req_mod(,barrier_acked)
385  * and receive_Barrier.
386  *
387  * Move entries from net_ee to done_ee, if ready.
388  * Grab done_ee, call all callbacks, free the entries.
389  * The callbacks typically send out ACKs.
390  */
drbd_process_done_ee(struct drbd_conf * mdev)391 static int drbd_process_done_ee(struct drbd_conf *mdev)
392 {
393 	LIST_HEAD(work_list);
394 	LIST_HEAD(reclaimed);
395 	struct drbd_epoch_entry *e, *t;
396 	int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
397 
398 	spin_lock_irq(&mdev->req_lock);
399 	reclaim_net_ee(mdev, &reclaimed);
400 	list_splice_init(&mdev->done_ee, &work_list);
401 	spin_unlock_irq(&mdev->req_lock);
402 
403 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
404 		drbd_free_net_ee(mdev, e);
405 
406 	/* possible callbacks here:
407 	 * e_end_block, and e_end_resync_block, e_send_discard_ack.
408 	 * all ignore the last argument.
409 	 */
410 	list_for_each_entry_safe(e, t, &work_list, w.list) {
411 		/* list_del not necessary, next/prev members not touched */
412 		ok = e->w.cb(mdev, &e->w, !ok) && ok;
413 		drbd_free_ee(mdev, e);
414 	}
415 	wake_up(&mdev->ee_wait);
416 
417 	return ok;
418 }
419 
_drbd_wait_ee_list_empty(struct drbd_conf * mdev,struct list_head * head)420 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
421 {
422 	DEFINE_WAIT(wait);
423 
424 	/* avoids spin_lock/unlock
425 	 * and calling prepare_to_wait in the fast path */
426 	while (!list_empty(head)) {
427 		prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
428 		spin_unlock_irq(&mdev->req_lock);
429 		io_schedule();
430 		finish_wait(&mdev->ee_wait, &wait);
431 		spin_lock_irq(&mdev->req_lock);
432 	}
433 }
434 
drbd_wait_ee_list_empty(struct drbd_conf * mdev,struct list_head * head)435 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
436 {
437 	spin_lock_irq(&mdev->req_lock);
438 	_drbd_wait_ee_list_empty(mdev, head);
439 	spin_unlock_irq(&mdev->req_lock);
440 }
441 
442 /* see also kernel_accept; which is only present since 2.6.18.
443  * also we want to log which part of it failed, exactly */
drbd_accept(struct drbd_conf * mdev,const char ** what,struct socket * sock,struct socket ** newsock)444 static int drbd_accept(struct drbd_conf *mdev, const char **what,
445 		struct socket *sock, struct socket **newsock)
446 {
447 	struct sock *sk = sock->sk;
448 	int err = 0;
449 
450 	*what = "listen";
451 	err = sock->ops->listen(sock, 5);
452 	if (err < 0)
453 		goto out;
454 
455 	*what = "sock_create_lite";
456 	err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
457 			       newsock);
458 	if (err < 0)
459 		goto out;
460 
461 	*what = "accept";
462 	err = sock->ops->accept(sock, *newsock, 0);
463 	if (err < 0) {
464 		sock_release(*newsock);
465 		*newsock = NULL;
466 		goto out;
467 	}
468 	(*newsock)->ops  = sock->ops;
469 
470 out:
471 	return err;
472 }
473 
drbd_recv_short(struct drbd_conf * mdev,struct socket * sock,void * buf,size_t size,int flags)474 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
475 		    void *buf, size_t size, int flags)
476 {
477 	mm_segment_t oldfs;
478 	struct kvec iov = {
479 		.iov_base = buf,
480 		.iov_len = size,
481 	};
482 	struct msghdr msg = {
483 		.msg_iovlen = 1,
484 		.msg_iov = (struct iovec *)&iov,
485 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
486 	};
487 	int rv;
488 
489 	oldfs = get_fs();
490 	set_fs(KERNEL_DS);
491 	rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
492 	set_fs(oldfs);
493 
494 	return rv;
495 }
496 
drbd_recv(struct drbd_conf * mdev,void * buf,size_t size)497 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
498 {
499 	mm_segment_t oldfs;
500 	struct kvec iov = {
501 		.iov_base = buf,
502 		.iov_len = size,
503 	};
504 	struct msghdr msg = {
505 		.msg_iovlen = 1,
506 		.msg_iov = (struct iovec *)&iov,
507 		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL
508 	};
509 	int rv;
510 
511 	oldfs = get_fs();
512 	set_fs(KERNEL_DS);
513 
514 	for (;;) {
515 		rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
516 		if (rv == size)
517 			break;
518 
519 		/* Note:
520 		 * ECONNRESET	other side closed the connection
521 		 * ERESTARTSYS	(on  sock) we got a signal
522 		 */
523 
524 		if (rv < 0) {
525 			if (rv == -ECONNRESET)
526 				dev_info(DEV, "sock was reset by peer\n");
527 			else if (rv != -ERESTARTSYS)
528 				dev_err(DEV, "sock_recvmsg returned %d\n", rv);
529 			break;
530 		} else if (rv == 0) {
531 			dev_info(DEV, "sock was shut down by peer\n");
532 			break;
533 		} else	{
534 			/* signal came in, or peer/link went down,
535 			 * after we read a partial message
536 			 */
537 			/* D_ASSERT(signal_pending(current)); */
538 			break;
539 		}
540 	};
541 
542 	set_fs(oldfs);
543 
544 	if (rv != size)
545 		drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
546 
547 	return rv;
548 }
549 
550 /* quoting tcp(7):
551  *   On individual connections, the socket buffer size must be set prior to the
552  *   listen(2) or connect(2) calls in order to have it take effect.
553  * This is our wrapper to do so.
554  */
drbd_setbufsize(struct socket * sock,unsigned int snd,unsigned int rcv)555 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
556 		unsigned int rcv)
557 {
558 	/* open coded SO_SNDBUF, SO_RCVBUF */
559 	if (snd) {
560 		sock->sk->sk_sndbuf = snd;
561 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
562 	}
563 	if (rcv) {
564 		sock->sk->sk_rcvbuf = rcv;
565 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
566 	}
567 }
568 
drbd_try_connect(struct drbd_conf * mdev)569 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
570 {
571 	const char *what;
572 	struct socket *sock;
573 	struct sockaddr_in6 src_in6;
574 	int err;
575 	int disconnect_on_error = 1;
576 
577 	if (!get_net_conf(mdev))
578 		return NULL;
579 
580 	what = "sock_create_kern";
581 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
582 		SOCK_STREAM, IPPROTO_TCP, &sock);
583 	if (err < 0) {
584 		sock = NULL;
585 		goto out;
586 	}
587 
588 	sock->sk->sk_rcvtimeo =
589 	sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
590 	drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
591 			mdev->net_conf->rcvbuf_size);
592 
593        /* explicitly bind to the configured IP as source IP
594 	*  for the outgoing connections.
595 	*  This is needed for multihomed hosts and to be
596 	*  able to use lo: interfaces for drbd.
597 	* Make sure to use 0 as port number, so linux selects
598 	*  a free one dynamically.
599 	*/
600 	memcpy(&src_in6, mdev->net_conf->my_addr,
601 	       min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
602 	if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
603 		src_in6.sin6_port = 0;
604 	else
605 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
606 
607 	what = "bind before connect";
608 	err = sock->ops->bind(sock,
609 			      (struct sockaddr *) &src_in6,
610 			      mdev->net_conf->my_addr_len);
611 	if (err < 0)
612 		goto out;
613 
614 	/* connect may fail, peer not yet available.
615 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
616 	disconnect_on_error = 0;
617 	what = "connect";
618 	err = sock->ops->connect(sock,
619 				 (struct sockaddr *)mdev->net_conf->peer_addr,
620 				 mdev->net_conf->peer_addr_len, 0);
621 
622 out:
623 	if (err < 0) {
624 		if (sock) {
625 			sock_release(sock);
626 			sock = NULL;
627 		}
628 		switch (-err) {
629 			/* timeout, busy, signal pending */
630 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
631 		case EINTR: case ERESTARTSYS:
632 			/* peer not (yet) available, network problem */
633 		case ECONNREFUSED: case ENETUNREACH:
634 		case EHOSTDOWN:    case EHOSTUNREACH:
635 			disconnect_on_error = 0;
636 			break;
637 		default:
638 			dev_err(DEV, "%s failed, err = %d\n", what, err);
639 		}
640 		if (disconnect_on_error)
641 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
642 	}
643 	put_net_conf(mdev);
644 	return sock;
645 }
646 
drbd_wait_for_connect(struct drbd_conf * mdev)647 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
648 {
649 	int timeo, err;
650 	struct socket *s_estab = NULL, *s_listen;
651 	const char *what;
652 
653 	if (!get_net_conf(mdev))
654 		return NULL;
655 
656 	what = "sock_create_kern";
657 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
658 		SOCK_STREAM, IPPROTO_TCP, &s_listen);
659 	if (err) {
660 		s_listen = NULL;
661 		goto out;
662 	}
663 
664 	timeo = mdev->net_conf->try_connect_int * HZ;
665 	timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
666 
667 	s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
668 	s_listen->sk->sk_rcvtimeo = timeo;
669 	s_listen->sk->sk_sndtimeo = timeo;
670 	drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
671 			mdev->net_conf->rcvbuf_size);
672 
673 	what = "bind before listen";
674 	err = s_listen->ops->bind(s_listen,
675 			      (struct sockaddr *) mdev->net_conf->my_addr,
676 			      mdev->net_conf->my_addr_len);
677 	if (err < 0)
678 		goto out;
679 
680 	err = drbd_accept(mdev, &what, s_listen, &s_estab);
681 
682 out:
683 	if (s_listen)
684 		sock_release(s_listen);
685 	if (err < 0) {
686 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
687 			dev_err(DEV, "%s failed, err = %d\n", what, err);
688 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
689 		}
690 	}
691 	put_net_conf(mdev);
692 
693 	return s_estab;
694 }
695 
drbd_send_fp(struct drbd_conf * mdev,struct socket * sock,enum drbd_packets cmd)696 static int drbd_send_fp(struct drbd_conf *mdev,
697 	struct socket *sock, enum drbd_packets cmd)
698 {
699 	struct p_header80 *h = &mdev->data.sbuf.header.h80;
700 
701 	return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
702 }
703 
drbd_recv_fp(struct drbd_conf * mdev,struct socket * sock)704 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
705 {
706 	struct p_header80 *h = &mdev->data.rbuf.header.h80;
707 	int rr;
708 
709 	rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
710 
711 	if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
712 		return be16_to_cpu(h->command);
713 
714 	return 0xffff;
715 }
716 
717 /**
718  * drbd_socket_okay() - Free the socket if its connection is not okay
719  * @mdev:	DRBD device.
720  * @sock:	pointer to the pointer to the socket.
721  */
drbd_socket_okay(struct drbd_conf * mdev,struct socket ** sock)722 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
723 {
724 	int rr;
725 	char tb[4];
726 
727 	if (!*sock)
728 		return false;
729 
730 	rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
731 
732 	if (rr > 0 || rr == -EAGAIN) {
733 		return true;
734 	} else {
735 		sock_release(*sock);
736 		*sock = NULL;
737 		return false;
738 	}
739 }
740 
741 /*
742  * return values:
743  *   1 yes, we have a valid connection
744  *   0 oops, did not work out, please try again
745  *  -1 peer talks different language,
746  *     no point in trying again, please go standalone.
747  *  -2 We do not have a network config...
748  */
drbd_connect(struct drbd_conf * mdev)749 static int drbd_connect(struct drbd_conf *mdev)
750 {
751 	struct socket *s, *sock, *msock;
752 	int try, h, ok;
753 
754 	D_ASSERT(!mdev->data.socket);
755 
756 	if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
757 		return -2;
758 
759 	clear_bit(DISCARD_CONCURRENT, &mdev->flags);
760 
761 	sock  = NULL;
762 	msock = NULL;
763 
764 	do {
765 		for (try = 0;;) {
766 			/* 3 tries, this should take less than a second! */
767 			s = drbd_try_connect(mdev);
768 			if (s || ++try >= 3)
769 				break;
770 			/* give the other side time to call bind() & listen() */
771 			schedule_timeout_interruptible(HZ / 10);
772 		}
773 
774 		if (s) {
775 			if (!sock) {
776 				drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
777 				sock = s;
778 				s = NULL;
779 			} else if (!msock) {
780 				drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
781 				msock = s;
782 				s = NULL;
783 			} else {
784 				dev_err(DEV, "Logic error in drbd_connect()\n");
785 				goto out_release_sockets;
786 			}
787 		}
788 
789 		if (sock && msock) {
790 			schedule_timeout_interruptible(HZ / 10);
791 			ok = drbd_socket_okay(mdev, &sock);
792 			ok = drbd_socket_okay(mdev, &msock) && ok;
793 			if (ok)
794 				break;
795 		}
796 
797 retry:
798 		s = drbd_wait_for_connect(mdev);
799 		if (s) {
800 			try = drbd_recv_fp(mdev, s);
801 			drbd_socket_okay(mdev, &sock);
802 			drbd_socket_okay(mdev, &msock);
803 			switch (try) {
804 			case P_HAND_SHAKE_S:
805 				if (sock) {
806 					dev_warn(DEV, "initial packet S crossed\n");
807 					sock_release(sock);
808 				}
809 				sock = s;
810 				break;
811 			case P_HAND_SHAKE_M:
812 				if (msock) {
813 					dev_warn(DEV, "initial packet M crossed\n");
814 					sock_release(msock);
815 				}
816 				msock = s;
817 				set_bit(DISCARD_CONCURRENT, &mdev->flags);
818 				break;
819 			default:
820 				dev_warn(DEV, "Error receiving initial packet\n");
821 				sock_release(s);
822 				if (random32() & 1)
823 					goto retry;
824 			}
825 		}
826 
827 		if (mdev->state.conn <= C_DISCONNECTING)
828 			goto out_release_sockets;
829 		if (signal_pending(current)) {
830 			flush_signals(current);
831 			smp_rmb();
832 			if (get_t_state(&mdev->receiver) == Exiting)
833 				goto out_release_sockets;
834 		}
835 
836 		if (sock && msock) {
837 			ok = drbd_socket_okay(mdev, &sock);
838 			ok = drbd_socket_okay(mdev, &msock) && ok;
839 			if (ok)
840 				break;
841 		}
842 	} while (1);
843 
844 	msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
845 	sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
846 
847 	sock->sk->sk_allocation = GFP_NOIO;
848 	msock->sk->sk_allocation = GFP_NOIO;
849 
850 	sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
851 	msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
852 
853 	/* NOT YET ...
854 	 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
855 	 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
856 	 * first set it to the P_HAND_SHAKE timeout,
857 	 * which we set to 4x the configured ping_timeout. */
858 	sock->sk->sk_sndtimeo =
859 	sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
860 
861 	msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862 	msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
863 
864 	/* we don't want delays.
865 	 * we use TCP_CORK where appropriate, though */
866 	drbd_tcp_nodelay(sock);
867 	drbd_tcp_nodelay(msock);
868 
869 	mdev->data.socket = sock;
870 	mdev->meta.socket = msock;
871 	mdev->last_received = jiffies;
872 
873 	D_ASSERT(mdev->asender.task == NULL);
874 
875 	h = drbd_do_handshake(mdev);
876 	if (h <= 0)
877 		return h;
878 
879 	if (mdev->cram_hmac_tfm) {
880 		/* drbd_request_state(mdev, NS(conn, WFAuth)); */
881 		switch (drbd_do_auth(mdev)) {
882 		case -1:
883 			dev_err(DEV, "Authentication of peer failed\n");
884 			return -1;
885 		case 0:
886 			dev_err(DEV, "Authentication of peer failed, trying again.\n");
887 			return 0;
888 		}
889 	}
890 
891 	if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
892 		return 0;
893 
894 	sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
895 	sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
896 
897 	atomic_set(&mdev->packet_seq, 0);
898 	mdev->peer_seq = 0;
899 
900 	drbd_thread_start(&mdev->asender);
901 
902 	if (mdev->agreed_pro_version < 95 && get_ldev(mdev)) {
903 		drbd_setup_queue_param(mdev, DRBD_MAX_SIZE_H80_PACKET);
904 		put_ldev(mdev);
905 	}
906 
907 	if (drbd_send_protocol(mdev) == -1)
908 		return -1;
909 	drbd_send_sync_param(mdev, &mdev->sync_conf);
910 	drbd_send_sizes(mdev, 0, 0);
911 	drbd_send_uuids(mdev);
912 	drbd_send_state(mdev);
913 	clear_bit(USE_DEGR_WFC_T, &mdev->flags);
914 	clear_bit(RESIZE_PENDING, &mdev->flags);
915 	mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
916 
917 	return 1;
918 
919 out_release_sockets:
920 	if (sock)
921 		sock_release(sock);
922 	if (msock)
923 		sock_release(msock);
924 	return -1;
925 }
926 
drbd_recv_header(struct drbd_conf * mdev,enum drbd_packets * cmd,unsigned int * packet_size)927 static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
928 {
929 	union p_header *h = &mdev->data.rbuf.header;
930 	int r;
931 
932 	r = drbd_recv(mdev, h, sizeof(*h));
933 	if (unlikely(r != sizeof(*h))) {
934 		if (!signal_pending(current))
935 			dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
936 		return false;
937 	}
938 
939 	if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
940 		*cmd = be16_to_cpu(h->h80.command);
941 		*packet_size = be16_to_cpu(h->h80.length);
942 	} else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
943 		*cmd = be16_to_cpu(h->h95.command);
944 		*packet_size = be32_to_cpu(h->h95.length);
945 	} else {
946 		dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
947 		    be32_to_cpu(h->h80.magic),
948 		    be16_to_cpu(h->h80.command),
949 		    be16_to_cpu(h->h80.length));
950 		return false;
951 	}
952 	mdev->last_received = jiffies;
953 
954 	return true;
955 }
956 
drbd_flush(struct drbd_conf * mdev)957 static void drbd_flush(struct drbd_conf *mdev)
958 {
959 	int rv;
960 
961 	if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
962 		rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
963 					NULL);
964 		if (rv) {
965 			dev_err(DEV, "local disk flush failed with status %d\n", rv);
966 			/* would rather check on EOPNOTSUPP, but that is not reliable.
967 			 * don't try again for ANY return value != 0
968 			 * if (rv == -EOPNOTSUPP) */
969 			drbd_bump_write_ordering(mdev, WO_drain_io);
970 		}
971 		put_ldev(mdev);
972 	}
973 }
974 
975 /**
976  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
977  * @mdev:	DRBD device.
978  * @epoch:	Epoch object.
979  * @ev:		Epoch event.
980  */
drbd_may_finish_epoch(struct drbd_conf * mdev,struct drbd_epoch * epoch,enum epoch_event ev)981 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
982 					       struct drbd_epoch *epoch,
983 					       enum epoch_event ev)
984 {
985 	int epoch_size;
986 	struct drbd_epoch *next_epoch;
987 	enum finish_epoch rv = FE_STILL_LIVE;
988 
989 	spin_lock(&mdev->epoch_lock);
990 	do {
991 		next_epoch = NULL;
992 
993 		epoch_size = atomic_read(&epoch->epoch_size);
994 
995 		switch (ev & ~EV_CLEANUP) {
996 		case EV_PUT:
997 			atomic_dec(&epoch->active);
998 			break;
999 		case EV_GOT_BARRIER_NR:
1000 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1001 			break;
1002 		case EV_BECAME_LAST:
1003 			/* nothing to do*/
1004 			break;
1005 		}
1006 
1007 		if (epoch_size != 0 &&
1008 		    atomic_read(&epoch->active) == 0 &&
1009 		    test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1010 			if (!(ev & EV_CLEANUP)) {
1011 				spin_unlock(&mdev->epoch_lock);
1012 				drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1013 				spin_lock(&mdev->epoch_lock);
1014 			}
1015 			dec_unacked(mdev);
1016 
1017 			if (mdev->current_epoch != epoch) {
1018 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1019 				list_del(&epoch->list);
1020 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1021 				mdev->epochs--;
1022 				kfree(epoch);
1023 
1024 				if (rv == FE_STILL_LIVE)
1025 					rv = FE_DESTROYED;
1026 			} else {
1027 				epoch->flags = 0;
1028 				atomic_set(&epoch->epoch_size, 0);
1029 				/* atomic_set(&epoch->active, 0); is already zero */
1030 				if (rv == FE_STILL_LIVE)
1031 					rv = FE_RECYCLED;
1032 				wake_up(&mdev->ee_wait);
1033 			}
1034 		}
1035 
1036 		if (!next_epoch)
1037 			break;
1038 
1039 		epoch = next_epoch;
1040 	} while (1);
1041 
1042 	spin_unlock(&mdev->epoch_lock);
1043 
1044 	return rv;
1045 }
1046 
1047 /**
1048  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1049  * @mdev:	DRBD device.
1050  * @wo:		Write ordering method to try.
1051  */
drbd_bump_write_ordering(struct drbd_conf * mdev,enum write_ordering_e wo)1052 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1053 {
1054 	enum write_ordering_e pwo;
1055 	static char *write_ordering_str[] = {
1056 		[WO_none] = "none",
1057 		[WO_drain_io] = "drain",
1058 		[WO_bdev_flush] = "flush",
1059 	};
1060 
1061 	pwo = mdev->write_ordering;
1062 	wo = min(pwo, wo);
1063 	if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1064 		wo = WO_drain_io;
1065 	if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1066 		wo = WO_none;
1067 	mdev->write_ordering = wo;
1068 	if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1069 		dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1070 }
1071 
1072 /**
1073  * drbd_submit_ee()
1074  * @mdev:	DRBD device.
1075  * @e:		epoch entry
1076  * @rw:		flag field, see bio->bi_rw
1077  *
1078  * May spread the pages to multiple bios,
1079  * depending on bio_add_page restrictions.
1080  *
1081  * Returns 0 if all bios have been submitted,
1082  * -ENOMEM if we could not allocate enough bios,
1083  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1084  *  single page to an empty bio (which should never happen and likely indicates
1085  *  that the lower level IO stack is in some way broken). This has been observed
1086  *  on certain Xen deployments.
1087  */
1088 /* TODO allocate from our own bio_set. */
drbd_submit_ee(struct drbd_conf * mdev,struct drbd_epoch_entry * e,const unsigned rw,const int fault_type)1089 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1090 		const unsigned rw, const int fault_type)
1091 {
1092 	struct bio *bios = NULL;
1093 	struct bio *bio;
1094 	struct page *page = e->pages;
1095 	sector_t sector = e->sector;
1096 	unsigned ds = e->size;
1097 	unsigned n_bios = 0;
1098 	unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1099 	int err = -ENOMEM;
1100 
1101 	/* In most cases, we will only need one bio.  But in case the lower
1102 	 * level restrictions happen to be different at this offset on this
1103 	 * side than those of the sending peer, we may need to submit the
1104 	 * request in more than one bio. */
1105 next_bio:
1106 	bio = bio_alloc(GFP_NOIO, nr_pages);
1107 	if (!bio) {
1108 		dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1109 		goto fail;
1110 	}
1111 	/* > e->sector, unless this is the first bio */
1112 	bio->bi_sector = sector;
1113 	bio->bi_bdev = mdev->ldev->backing_bdev;
1114 	bio->bi_rw = rw;
1115 	bio->bi_private = e;
1116 	bio->bi_end_io = drbd_endio_sec;
1117 
1118 	bio->bi_next = bios;
1119 	bios = bio;
1120 	++n_bios;
1121 
1122 	page_chain_for_each(page) {
1123 		unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1124 		if (!bio_add_page(bio, page, len, 0)) {
1125 			/* A single page must always be possible!
1126 			 * But in case it fails anyways,
1127 			 * we deal with it, and complain (below). */
1128 			if (bio->bi_vcnt == 0) {
1129 				dev_err(DEV,
1130 					"bio_add_page failed for len=%u, "
1131 					"bi_vcnt=0 (bi_sector=%llu)\n",
1132 					len, (unsigned long long)bio->bi_sector);
1133 				err = -ENOSPC;
1134 				goto fail;
1135 			}
1136 			goto next_bio;
1137 		}
1138 		ds -= len;
1139 		sector += len >> 9;
1140 		--nr_pages;
1141 	}
1142 	D_ASSERT(page == NULL);
1143 	D_ASSERT(ds == 0);
1144 
1145 	atomic_set(&e->pending_bios, n_bios);
1146 	do {
1147 		bio = bios;
1148 		bios = bios->bi_next;
1149 		bio->bi_next = NULL;
1150 
1151 		drbd_generic_make_request(mdev, fault_type, bio);
1152 	} while (bios);
1153 	return 0;
1154 
1155 fail:
1156 	while (bios) {
1157 		bio = bios;
1158 		bios = bios->bi_next;
1159 		bio_put(bio);
1160 	}
1161 	return err;
1162 }
1163 
receive_Barrier(struct drbd_conf * mdev,enum drbd_packets cmd,unsigned int data_size)1164 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1165 {
1166 	int rv;
1167 	struct p_barrier *p = &mdev->data.rbuf.barrier;
1168 	struct drbd_epoch *epoch;
1169 
1170 	inc_unacked(mdev);
1171 
1172 	mdev->current_epoch->barrier_nr = p->barrier;
1173 	rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1174 
1175 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1176 	 * the activity log, which means it would not be resynced in case the
1177 	 * R_PRIMARY crashes now.
1178 	 * Therefore we must send the barrier_ack after the barrier request was
1179 	 * completed. */
1180 	switch (mdev->write_ordering) {
1181 	case WO_none:
1182 		if (rv == FE_RECYCLED)
1183 			return true;
1184 
1185 		/* receiver context, in the writeout path of the other node.
1186 		 * avoid potential distributed deadlock */
1187 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1188 		if (epoch)
1189 			break;
1190 		else
1191 			dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1192 			/* Fall through */
1193 
1194 	case WO_bdev_flush:
1195 	case WO_drain_io:
1196 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1197 		drbd_flush(mdev);
1198 
1199 		if (atomic_read(&mdev->current_epoch->epoch_size)) {
1200 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1201 			if (epoch)
1202 				break;
1203 		}
1204 
1205 		epoch = mdev->current_epoch;
1206 		wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1207 
1208 		D_ASSERT(atomic_read(&epoch->active) == 0);
1209 		D_ASSERT(epoch->flags == 0);
1210 
1211 		return true;
1212 	default:
1213 		dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1214 		return false;
1215 	}
1216 
1217 	epoch->flags = 0;
1218 	atomic_set(&epoch->epoch_size, 0);
1219 	atomic_set(&epoch->active, 0);
1220 
1221 	spin_lock(&mdev->epoch_lock);
1222 	if (atomic_read(&mdev->current_epoch->epoch_size)) {
1223 		list_add(&epoch->list, &mdev->current_epoch->list);
1224 		mdev->current_epoch = epoch;
1225 		mdev->epochs++;
1226 	} else {
1227 		/* The current_epoch got recycled while we allocated this one... */
1228 		kfree(epoch);
1229 	}
1230 	spin_unlock(&mdev->epoch_lock);
1231 
1232 	return true;
1233 }
1234 
1235 /* used from receive_RSDataReply (recv_resync_read)
1236  * and from receive_Data */
1237 static struct drbd_epoch_entry *
read_in_block(struct drbd_conf * mdev,u64 id,sector_t sector,int data_size)1238 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1239 {
1240 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1241 	struct drbd_epoch_entry *e;
1242 	struct page *page;
1243 	int dgs, ds, rr;
1244 	void *dig_in = mdev->int_dig_in;
1245 	void *dig_vv = mdev->int_dig_vv;
1246 	unsigned long *data;
1247 
1248 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1249 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1250 
1251 	if (dgs) {
1252 		rr = drbd_recv(mdev, dig_in, dgs);
1253 		if (rr != dgs) {
1254 			if (!signal_pending(current))
1255 				dev_warn(DEV,
1256 					"short read receiving data digest: read %d expected %d\n",
1257 					rr, dgs);
1258 			return NULL;
1259 		}
1260 	}
1261 
1262 	data_size -= dgs;
1263 
1264 	ERR_IF(data_size == 0) return NULL;
1265 	ERR_IF(data_size &  0x1ff) return NULL;
1266 	ERR_IF(data_size >  DRBD_MAX_BIO_SIZE) return NULL;
1267 
1268 	/* even though we trust out peer,
1269 	 * we sometimes have to double check. */
1270 	if (sector + (data_size>>9) > capacity) {
1271 		dev_err(DEV, "request from peer beyond end of local disk: "
1272 			"capacity: %llus < sector: %llus + size: %u\n",
1273 			(unsigned long long)capacity,
1274 			(unsigned long long)sector, data_size);
1275 		return NULL;
1276 	}
1277 
1278 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1279 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1280 	 * which in turn might block on the other node at this very place.  */
1281 	e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1282 	if (!e)
1283 		return NULL;
1284 
1285 	ds = data_size;
1286 	page = e->pages;
1287 	page_chain_for_each(page) {
1288 		unsigned len = min_t(int, ds, PAGE_SIZE);
1289 		data = kmap(page);
1290 		rr = drbd_recv(mdev, data, len);
1291 		if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1292 			dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1293 			data[0] = data[0] ^ (unsigned long)-1;
1294 		}
1295 		kunmap(page);
1296 		if (rr != len) {
1297 			drbd_free_ee(mdev, e);
1298 			if (!signal_pending(current))
1299 				dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1300 				rr, len);
1301 			return NULL;
1302 		}
1303 		ds -= rr;
1304 	}
1305 
1306 	if (dgs) {
1307 		drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1308 		if (memcmp(dig_in, dig_vv, dgs)) {
1309 			dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1310 				(unsigned long long)sector, data_size);
1311 			drbd_bcast_ee(mdev, "digest failed",
1312 					dgs, dig_in, dig_vv, e);
1313 			drbd_free_ee(mdev, e);
1314 			return NULL;
1315 		}
1316 	}
1317 	mdev->recv_cnt += data_size>>9;
1318 	return e;
1319 }
1320 
1321 /* drbd_drain_block() just takes a data block
1322  * out of the socket input buffer, and discards it.
1323  */
drbd_drain_block(struct drbd_conf * mdev,int data_size)1324 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1325 {
1326 	struct page *page;
1327 	int rr, rv = 1;
1328 	void *data;
1329 
1330 	if (!data_size)
1331 		return true;
1332 
1333 	page = drbd_pp_alloc(mdev, 1, 1);
1334 
1335 	data = kmap(page);
1336 	while (data_size) {
1337 		rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1338 		if (rr != min_t(int, data_size, PAGE_SIZE)) {
1339 			rv = 0;
1340 			if (!signal_pending(current))
1341 				dev_warn(DEV,
1342 					"short read receiving data: read %d expected %d\n",
1343 					rr, min_t(int, data_size, PAGE_SIZE));
1344 			break;
1345 		}
1346 		data_size -= rr;
1347 	}
1348 	kunmap(page);
1349 	drbd_pp_free(mdev, page, 0);
1350 	return rv;
1351 }
1352 
recv_dless_read(struct drbd_conf * mdev,struct drbd_request * req,sector_t sector,int data_size)1353 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1354 			   sector_t sector, int data_size)
1355 {
1356 	struct bio_vec *bvec;
1357 	struct bio *bio;
1358 	int dgs, rr, i, expect;
1359 	void *dig_in = mdev->int_dig_in;
1360 	void *dig_vv = mdev->int_dig_vv;
1361 
1362 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1363 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1364 
1365 	if (dgs) {
1366 		rr = drbd_recv(mdev, dig_in, dgs);
1367 		if (rr != dgs) {
1368 			if (!signal_pending(current))
1369 				dev_warn(DEV,
1370 					"short read receiving data reply digest: read %d expected %d\n",
1371 					rr, dgs);
1372 			return 0;
1373 		}
1374 	}
1375 
1376 	data_size -= dgs;
1377 
1378 	/* optimistically update recv_cnt.  if receiving fails below,
1379 	 * we disconnect anyways, and counters will be reset. */
1380 	mdev->recv_cnt += data_size>>9;
1381 
1382 	bio = req->master_bio;
1383 	D_ASSERT(sector == bio->bi_sector);
1384 
1385 	bio_for_each_segment(bvec, bio, i) {
1386 		expect = min_t(int, data_size, bvec->bv_len);
1387 		rr = drbd_recv(mdev,
1388 			     kmap(bvec->bv_page)+bvec->bv_offset,
1389 			     expect);
1390 		kunmap(bvec->bv_page);
1391 		if (rr != expect) {
1392 			if (!signal_pending(current))
1393 				dev_warn(DEV, "short read receiving data reply: "
1394 					"read %d expected %d\n",
1395 					rr, expect);
1396 			return 0;
1397 		}
1398 		data_size -= rr;
1399 	}
1400 
1401 	if (dgs) {
1402 		drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1403 		if (memcmp(dig_in, dig_vv, dgs)) {
1404 			dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1405 			return 0;
1406 		}
1407 	}
1408 
1409 	D_ASSERT(data_size == 0);
1410 	return 1;
1411 }
1412 
1413 /* e_end_resync_block() is called via
1414  * drbd_process_done_ee() by asender only */
e_end_resync_block(struct drbd_conf * mdev,struct drbd_work * w,int unused)1415 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1416 {
1417 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1418 	sector_t sector = e->sector;
1419 	int ok;
1420 
1421 	D_ASSERT(hlist_unhashed(&e->colision));
1422 
1423 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1424 		drbd_set_in_sync(mdev, sector, e->size);
1425 		ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1426 	} else {
1427 		/* Record failure to sync */
1428 		drbd_rs_failed_io(mdev, sector, e->size);
1429 
1430 		ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1431 	}
1432 	dec_unacked(mdev);
1433 
1434 	return ok;
1435 }
1436 
recv_resync_read(struct drbd_conf * mdev,sector_t sector,int data_size)1437 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1438 {
1439 	struct drbd_epoch_entry *e;
1440 
1441 	e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1442 	if (!e)
1443 		goto fail;
1444 
1445 	dec_rs_pending(mdev);
1446 
1447 	inc_unacked(mdev);
1448 	/* corresponding dec_unacked() in e_end_resync_block()
1449 	 * respective _drbd_clear_done_ee */
1450 
1451 	e->w.cb = e_end_resync_block;
1452 
1453 	spin_lock_irq(&mdev->req_lock);
1454 	list_add(&e->w.list, &mdev->sync_ee);
1455 	spin_unlock_irq(&mdev->req_lock);
1456 
1457 	atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1458 	if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1459 		return true;
1460 
1461 	/* don't care for the reason here */
1462 	dev_err(DEV, "submit failed, triggering re-connect\n");
1463 	spin_lock_irq(&mdev->req_lock);
1464 	list_del(&e->w.list);
1465 	spin_unlock_irq(&mdev->req_lock);
1466 
1467 	drbd_free_ee(mdev, e);
1468 fail:
1469 	put_ldev(mdev);
1470 	return false;
1471 }
1472 
receive_DataReply(struct drbd_conf * mdev,enum drbd_packets cmd,unsigned int data_size)1473 static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1474 {
1475 	struct drbd_request *req;
1476 	sector_t sector;
1477 	int ok;
1478 	struct p_data *p = &mdev->data.rbuf.data;
1479 
1480 	sector = be64_to_cpu(p->sector);
1481 
1482 	spin_lock_irq(&mdev->req_lock);
1483 	req = _ar_id_to_req(mdev, p->block_id, sector);
1484 	spin_unlock_irq(&mdev->req_lock);
1485 	if (unlikely(!req)) {
1486 		dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1487 		return false;
1488 	}
1489 
1490 	/* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1491 	 * special casing it there for the various failure cases.
1492 	 * still no race with drbd_fail_pending_reads */
1493 	ok = recv_dless_read(mdev, req, sector, data_size);
1494 
1495 	if (ok)
1496 		req_mod(req, data_received);
1497 	/* else: nothing. handled from drbd_disconnect...
1498 	 * I don't think we may complete this just yet
1499 	 * in case we are "on-disconnect: freeze" */
1500 
1501 	return ok;
1502 }
1503 
receive_RSDataReply(struct drbd_conf * mdev,enum drbd_packets cmd,unsigned int data_size)1504 static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1505 {
1506 	sector_t sector;
1507 	int ok;
1508 	struct p_data *p = &mdev->data.rbuf.data;
1509 
1510 	sector = be64_to_cpu(p->sector);
1511 	D_ASSERT(p->block_id == ID_SYNCER);
1512 
1513 	if (get_ldev(mdev)) {
1514 		/* data is submitted to disk within recv_resync_read.
1515 		 * corresponding put_ldev done below on error,
1516 		 * or in drbd_endio_write_sec. */
1517 		ok = recv_resync_read(mdev, sector, data_size);
1518 	} else {
1519 		if (__ratelimit(&drbd_ratelimit_state))
1520 			dev_err(DEV, "Can not write resync data to local disk.\n");
1521 
1522 		ok = drbd_drain_block(mdev, data_size);
1523 
1524 		drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1525 	}
1526 
1527 	atomic_add(data_size >> 9, &mdev->rs_sect_in);
1528 
1529 	return ok;
1530 }
1531 
1532 /* e_end_block() is called via drbd_process_done_ee().
1533  * this means this function only runs in the asender thread
1534  */
e_end_block(struct drbd_conf * mdev,struct drbd_work * w,int cancel)1535 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1536 {
1537 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1538 	sector_t sector = e->sector;
1539 	int ok = 1, pcmd;
1540 
1541 	if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1542 		if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1543 			pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1544 				mdev->state.conn <= C_PAUSED_SYNC_T &&
1545 				e->flags & EE_MAY_SET_IN_SYNC) ?
1546 				P_RS_WRITE_ACK : P_WRITE_ACK;
1547 			ok &= drbd_send_ack(mdev, pcmd, e);
1548 			if (pcmd == P_RS_WRITE_ACK)
1549 				drbd_set_in_sync(mdev, sector, e->size);
1550 		} else {
1551 			ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1552 			/* we expect it to be marked out of sync anyways...
1553 			 * maybe assert this?  */
1554 		}
1555 		dec_unacked(mdev);
1556 	}
1557 	/* we delete from the conflict detection hash _after_ we sent out the
1558 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1559 	if (mdev->net_conf->two_primaries) {
1560 		spin_lock_irq(&mdev->req_lock);
1561 		D_ASSERT(!hlist_unhashed(&e->colision));
1562 		hlist_del_init(&e->colision);
1563 		spin_unlock_irq(&mdev->req_lock);
1564 	} else {
1565 		D_ASSERT(hlist_unhashed(&e->colision));
1566 	}
1567 
1568 	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1569 
1570 	return ok;
1571 }
1572 
e_send_discard_ack(struct drbd_conf * mdev,struct drbd_work * w,int unused)1573 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1574 {
1575 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1576 	int ok = 1;
1577 
1578 	D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1579 	ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1580 
1581 	spin_lock_irq(&mdev->req_lock);
1582 	D_ASSERT(!hlist_unhashed(&e->colision));
1583 	hlist_del_init(&e->colision);
1584 	spin_unlock_irq(&mdev->req_lock);
1585 
1586 	dec_unacked(mdev);
1587 
1588 	return ok;
1589 }
1590 
1591 /* Called from receive_Data.
1592  * Synchronize packets on sock with packets on msock.
1593  *
1594  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1595  * packet traveling on msock, they are still processed in the order they have
1596  * been sent.
1597  *
1598  * Note: we don't care for Ack packets overtaking P_DATA packets.
1599  *
1600  * In case packet_seq is larger than mdev->peer_seq number, there are
1601  * outstanding packets on the msock. We wait for them to arrive.
1602  * In case we are the logically next packet, we update mdev->peer_seq
1603  * ourselves. Correctly handles 32bit wrap around.
1604  *
1605  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1606  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1607  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1608  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1609  *
1610  * returns 0 if we may process the packet,
1611  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
drbd_wait_peer_seq(struct drbd_conf * mdev,const u32 packet_seq)1612 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1613 {
1614 	DEFINE_WAIT(wait);
1615 	unsigned int p_seq;
1616 	long timeout;
1617 	int ret = 0;
1618 	spin_lock(&mdev->peer_seq_lock);
1619 	for (;;) {
1620 		prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1621 		if (seq_le(packet_seq, mdev->peer_seq+1))
1622 			break;
1623 		if (signal_pending(current)) {
1624 			ret = -ERESTARTSYS;
1625 			break;
1626 		}
1627 		p_seq = mdev->peer_seq;
1628 		spin_unlock(&mdev->peer_seq_lock);
1629 		timeout = schedule_timeout(30*HZ);
1630 		spin_lock(&mdev->peer_seq_lock);
1631 		if (timeout == 0 && p_seq == mdev->peer_seq) {
1632 			ret = -ETIMEDOUT;
1633 			dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1634 			break;
1635 		}
1636 	}
1637 	finish_wait(&mdev->seq_wait, &wait);
1638 	if (mdev->peer_seq+1 == packet_seq)
1639 		mdev->peer_seq++;
1640 	spin_unlock(&mdev->peer_seq_lock);
1641 	return ret;
1642 }
1643 
1644 /* see also bio_flags_to_wire()
1645  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1646  * flags and back. We may replicate to other kernel versions. */
wire_flags_to_bio(struct drbd_conf * mdev,u32 dpf)1647 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1648 {
1649 	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1650 		(dpf & DP_FUA ? REQ_FUA : 0) |
1651 		(dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1652 		(dpf & DP_DISCARD ? REQ_DISCARD : 0);
1653 }
1654 
1655 /* mirrored write */
receive_Data(struct drbd_conf * mdev,enum drbd_packets cmd,unsigned int data_size)1656 static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1657 {
1658 	sector_t sector;
1659 	struct drbd_epoch_entry *e;
1660 	struct p_data *p = &mdev->data.rbuf.data;
1661 	int rw = WRITE;
1662 	u32 dp_flags;
1663 
1664 	if (!get_ldev(mdev)) {
1665 		spin_lock(&mdev->peer_seq_lock);
1666 		if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1667 			mdev->peer_seq++;
1668 		spin_unlock(&mdev->peer_seq_lock);
1669 
1670 		drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1671 		atomic_inc(&mdev->current_epoch->epoch_size);
1672 		return drbd_drain_block(mdev, data_size);
1673 	}
1674 
1675 	/* get_ldev(mdev) successful.
1676 	 * Corresponding put_ldev done either below (on various errors),
1677 	 * or in drbd_endio_write_sec, if we successfully submit the data at
1678 	 * the end of this function. */
1679 
1680 	sector = be64_to_cpu(p->sector);
1681 	e = read_in_block(mdev, p->block_id, sector, data_size);
1682 	if (!e) {
1683 		put_ldev(mdev);
1684 		return false;
1685 	}
1686 
1687 	e->w.cb = e_end_block;
1688 
1689 	dp_flags = be32_to_cpu(p->dp_flags);
1690 	rw |= wire_flags_to_bio(mdev, dp_flags);
1691 
1692 	if (dp_flags & DP_MAY_SET_IN_SYNC)
1693 		e->flags |= EE_MAY_SET_IN_SYNC;
1694 
1695 	spin_lock(&mdev->epoch_lock);
1696 	e->epoch = mdev->current_epoch;
1697 	atomic_inc(&e->epoch->epoch_size);
1698 	atomic_inc(&e->epoch->active);
1699 	spin_unlock(&mdev->epoch_lock);
1700 
1701 	/* I'm the receiver, I do hold a net_cnt reference. */
1702 	if (!mdev->net_conf->two_primaries) {
1703 		spin_lock_irq(&mdev->req_lock);
1704 	} else {
1705 		/* don't get the req_lock yet,
1706 		 * we may sleep in drbd_wait_peer_seq */
1707 		const int size = e->size;
1708 		const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1709 		DEFINE_WAIT(wait);
1710 		struct drbd_request *i;
1711 		struct hlist_node *n;
1712 		struct hlist_head *slot;
1713 		int first;
1714 
1715 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1716 		BUG_ON(mdev->ee_hash == NULL);
1717 		BUG_ON(mdev->tl_hash == NULL);
1718 
1719 		/* conflict detection and handling:
1720 		 * 1. wait on the sequence number,
1721 		 *    in case this data packet overtook ACK packets.
1722 		 * 2. check our hash tables for conflicting requests.
1723 		 *    we only need to walk the tl_hash, since an ee can not
1724 		 *    have a conflict with an other ee: on the submitting
1725 		 *    node, the corresponding req had already been conflicting,
1726 		 *    and a conflicting req is never sent.
1727 		 *
1728 		 * Note: for two_primaries, we are protocol C,
1729 		 * so there cannot be any request that is DONE
1730 		 * but still on the transfer log.
1731 		 *
1732 		 * unconditionally add to the ee_hash.
1733 		 *
1734 		 * if no conflicting request is found:
1735 		 *    submit.
1736 		 *
1737 		 * if any conflicting request is found
1738 		 * that has not yet been acked,
1739 		 * AND I have the "discard concurrent writes" flag:
1740 		 *	 queue (via done_ee) the P_DISCARD_ACK; OUT.
1741 		 *
1742 		 * if any conflicting request is found:
1743 		 *	 block the receiver, waiting on misc_wait
1744 		 *	 until no more conflicting requests are there,
1745 		 *	 or we get interrupted (disconnect).
1746 		 *
1747 		 *	 we do not just write after local io completion of those
1748 		 *	 requests, but only after req is done completely, i.e.
1749 		 *	 we wait for the P_DISCARD_ACK to arrive!
1750 		 *
1751 		 *	 then proceed normally, i.e. submit.
1752 		 */
1753 		if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1754 			goto out_interrupted;
1755 
1756 		spin_lock_irq(&mdev->req_lock);
1757 
1758 		hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1759 
1760 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1761 		slot = tl_hash_slot(mdev, sector);
1762 		first = 1;
1763 		for (;;) {
1764 			int have_unacked = 0;
1765 			int have_conflict = 0;
1766 			prepare_to_wait(&mdev->misc_wait, &wait,
1767 				TASK_INTERRUPTIBLE);
1768 			hlist_for_each_entry(i, n, slot, colision) {
1769 				if (OVERLAPS) {
1770 					/* only ALERT on first iteration,
1771 					 * we may be woken up early... */
1772 					if (first)
1773 						dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1774 						      "	new: %llus +%u; pending: %llus +%u\n",
1775 						      current->comm, current->pid,
1776 						      (unsigned long long)sector, size,
1777 						      (unsigned long long)i->sector, i->size);
1778 					if (i->rq_state & RQ_NET_PENDING)
1779 						++have_unacked;
1780 					++have_conflict;
1781 				}
1782 			}
1783 #undef OVERLAPS
1784 			if (!have_conflict)
1785 				break;
1786 
1787 			/* Discard Ack only for the _first_ iteration */
1788 			if (first && discard && have_unacked) {
1789 				dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1790 				     (unsigned long long)sector);
1791 				inc_unacked(mdev);
1792 				e->w.cb = e_send_discard_ack;
1793 				list_add_tail(&e->w.list, &mdev->done_ee);
1794 
1795 				spin_unlock_irq(&mdev->req_lock);
1796 
1797 				/* we could probably send that P_DISCARD_ACK ourselves,
1798 				 * but I don't like the receiver using the msock */
1799 
1800 				put_ldev(mdev);
1801 				wake_asender(mdev);
1802 				finish_wait(&mdev->misc_wait, &wait);
1803 				return true;
1804 			}
1805 
1806 			if (signal_pending(current)) {
1807 				hlist_del_init(&e->colision);
1808 
1809 				spin_unlock_irq(&mdev->req_lock);
1810 
1811 				finish_wait(&mdev->misc_wait, &wait);
1812 				goto out_interrupted;
1813 			}
1814 
1815 			spin_unlock_irq(&mdev->req_lock);
1816 			if (first) {
1817 				first = 0;
1818 				dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1819 				     "sec=%llus\n", (unsigned long long)sector);
1820 			} else if (discard) {
1821 				/* we had none on the first iteration.
1822 				 * there must be none now. */
1823 				D_ASSERT(have_unacked == 0);
1824 			}
1825 			schedule();
1826 			spin_lock_irq(&mdev->req_lock);
1827 		}
1828 		finish_wait(&mdev->misc_wait, &wait);
1829 	}
1830 
1831 	list_add(&e->w.list, &mdev->active_ee);
1832 	spin_unlock_irq(&mdev->req_lock);
1833 
1834 	switch (mdev->net_conf->wire_protocol) {
1835 	case DRBD_PROT_C:
1836 		inc_unacked(mdev);
1837 		/* corresponding dec_unacked() in e_end_block()
1838 		 * respective _drbd_clear_done_ee */
1839 		break;
1840 	case DRBD_PROT_B:
1841 		/* I really don't like it that the receiver thread
1842 		 * sends on the msock, but anyways */
1843 		drbd_send_ack(mdev, P_RECV_ACK, e);
1844 		break;
1845 	case DRBD_PROT_A:
1846 		/* nothing to do */
1847 		break;
1848 	}
1849 
1850 	if (mdev->state.pdsk < D_INCONSISTENT) {
1851 		/* In case we have the only disk of the cluster, */
1852 		drbd_set_out_of_sync(mdev, e->sector, e->size);
1853 		e->flags |= EE_CALL_AL_COMPLETE_IO;
1854 		e->flags &= ~EE_MAY_SET_IN_SYNC;
1855 		drbd_al_begin_io(mdev, e->sector);
1856 	}
1857 
1858 	if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1859 		return true;
1860 
1861 	/* don't care for the reason here */
1862 	dev_err(DEV, "submit failed, triggering re-connect\n");
1863 	spin_lock_irq(&mdev->req_lock);
1864 	list_del(&e->w.list);
1865 	hlist_del_init(&e->colision);
1866 	spin_unlock_irq(&mdev->req_lock);
1867 	if (e->flags & EE_CALL_AL_COMPLETE_IO)
1868 		drbd_al_complete_io(mdev, e->sector);
1869 
1870 out_interrupted:
1871 	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
1872 	put_ldev(mdev);
1873 	drbd_free_ee(mdev, e);
1874 	return false;
1875 }
1876 
1877 /* We may throttle resync, if the lower device seems to be busy,
1878  * and current sync rate is above c_min_rate.
1879  *
1880  * To decide whether or not the lower device is busy, we use a scheme similar
1881  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1882  * (more than 64 sectors) of activity we cannot account for with our own resync
1883  * activity, it obviously is "busy".
1884  *
1885  * The current sync rate used here uses only the most recent two step marks,
1886  * to have a short time average so we can react faster.
1887  */
drbd_rs_should_slow_down(struct drbd_conf * mdev,sector_t sector)1888 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
1889 {
1890 	struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1891 	unsigned long db, dt, dbdt;
1892 	struct lc_element *tmp;
1893 	int curr_events;
1894 	int throttle = 0;
1895 
1896 	/* feature disabled? */
1897 	if (mdev->sync_conf.c_min_rate == 0)
1898 		return 0;
1899 
1900 	spin_lock_irq(&mdev->al_lock);
1901 	tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1902 	if (tmp) {
1903 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1904 		if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1905 			spin_unlock_irq(&mdev->al_lock);
1906 			return 0;
1907 		}
1908 		/* Do not slow down if app IO is already waiting for this extent */
1909 	}
1910 	spin_unlock_irq(&mdev->al_lock);
1911 
1912 	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1913 		      (int)part_stat_read(&disk->part0, sectors[1]) -
1914 			atomic_read(&mdev->rs_sect_ev);
1915 
1916 	if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1917 		unsigned long rs_left;
1918 		int i;
1919 
1920 		mdev->rs_last_events = curr_events;
1921 
1922 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1923 		 * approx. */
1924 		i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1925 
1926 		if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1927 			rs_left = mdev->ov_left;
1928 		else
1929 			rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1930 
1931 		dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1932 		if (!dt)
1933 			dt++;
1934 		db = mdev->rs_mark_left[i] - rs_left;
1935 		dbdt = Bit2KB(db/dt);
1936 
1937 		if (dbdt > mdev->sync_conf.c_min_rate)
1938 			throttle = 1;
1939 	}
1940 	return throttle;
1941 }
1942 
1943 
receive_DataRequest(struct drbd_conf * mdev,enum drbd_packets cmd,unsigned int digest_size)1944 static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
1945 {
1946 	sector_t sector;
1947 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1948 	struct drbd_epoch_entry *e;
1949 	struct digest_info *di = NULL;
1950 	int size, verb;
1951 	unsigned int fault_type;
1952 	struct p_block_req *p =	&mdev->data.rbuf.block_req;
1953 
1954 	sector = be64_to_cpu(p->sector);
1955 	size   = be32_to_cpu(p->blksize);
1956 
1957 	if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
1958 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1959 				(unsigned long long)sector, size);
1960 		return false;
1961 	}
1962 	if (sector + (size>>9) > capacity) {
1963 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1964 				(unsigned long long)sector, size);
1965 		return false;
1966 	}
1967 
1968 	if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1969 		verb = 1;
1970 		switch (cmd) {
1971 		case P_DATA_REQUEST:
1972 			drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
1973 			break;
1974 		case P_RS_DATA_REQUEST:
1975 		case P_CSUM_RS_REQUEST:
1976 		case P_OV_REQUEST:
1977 			drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
1978 			break;
1979 		case P_OV_REPLY:
1980 			verb = 0;
1981 			dec_rs_pending(mdev);
1982 			drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
1983 			break;
1984 		default:
1985 			dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
1986 				cmdname(cmd));
1987 		}
1988 		if (verb && __ratelimit(&drbd_ratelimit_state))
1989 			dev_err(DEV, "Can not satisfy peer's read request, "
1990 			    "no local data.\n");
1991 
1992 		/* drain possibly payload */
1993 		return drbd_drain_block(mdev, digest_size);
1994 	}
1995 
1996 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1997 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1998 	 * which in turn might block on the other node at this very place.  */
1999 	e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2000 	if (!e) {
2001 		put_ldev(mdev);
2002 		return false;
2003 	}
2004 
2005 	switch (cmd) {
2006 	case P_DATA_REQUEST:
2007 		e->w.cb = w_e_end_data_req;
2008 		fault_type = DRBD_FAULT_DT_RD;
2009 		/* application IO, don't drbd_rs_begin_io */
2010 		goto submit;
2011 
2012 	case P_RS_DATA_REQUEST:
2013 		e->w.cb = w_e_end_rsdata_req;
2014 		fault_type = DRBD_FAULT_RS_RD;
2015 		/* used in the sector offset progress display */
2016 		mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2017 		break;
2018 
2019 	case P_OV_REPLY:
2020 	case P_CSUM_RS_REQUEST:
2021 		fault_type = DRBD_FAULT_RS_RD;
2022 		di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2023 		if (!di)
2024 			goto out_free_e;
2025 
2026 		di->digest_size = digest_size;
2027 		di->digest = (((char *)di)+sizeof(struct digest_info));
2028 
2029 		e->digest = di;
2030 		e->flags |= EE_HAS_DIGEST;
2031 
2032 		if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2033 			goto out_free_e;
2034 
2035 		if (cmd == P_CSUM_RS_REQUEST) {
2036 			D_ASSERT(mdev->agreed_pro_version >= 89);
2037 			e->w.cb = w_e_end_csum_rs_req;
2038 			/* used in the sector offset progress display */
2039 			mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2040 		} else if (cmd == P_OV_REPLY) {
2041 			/* track progress, we may need to throttle */
2042 			atomic_add(size >> 9, &mdev->rs_sect_in);
2043 			e->w.cb = w_e_end_ov_reply;
2044 			dec_rs_pending(mdev);
2045 			/* drbd_rs_begin_io done when we sent this request,
2046 			 * but accounting still needs to be done. */
2047 			goto submit_for_resync;
2048 		}
2049 		break;
2050 
2051 	case P_OV_REQUEST:
2052 		if (mdev->ov_start_sector == ~(sector_t)0 &&
2053 		    mdev->agreed_pro_version >= 90) {
2054 			unsigned long now = jiffies;
2055 			int i;
2056 			mdev->ov_start_sector = sector;
2057 			mdev->ov_position = sector;
2058 			mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2059 			mdev->rs_total = mdev->ov_left;
2060 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2061 				mdev->rs_mark_left[i] = mdev->ov_left;
2062 				mdev->rs_mark_time[i] = now;
2063 			}
2064 			dev_info(DEV, "Online Verify start sector: %llu\n",
2065 					(unsigned long long)sector);
2066 		}
2067 		e->w.cb = w_e_end_ov_req;
2068 		fault_type = DRBD_FAULT_RS_RD;
2069 		break;
2070 
2071 	default:
2072 		dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2073 		    cmdname(cmd));
2074 		fault_type = DRBD_FAULT_MAX;
2075 		goto out_free_e;
2076 	}
2077 
2078 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2079 	 * wrt the receiver, but it is not as straightforward as it may seem.
2080 	 * Various places in the resync start and stop logic assume resync
2081 	 * requests are processed in order, requeuing this on the worker thread
2082 	 * introduces a bunch of new code for synchronization between threads.
2083 	 *
2084 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2085 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2086 	 * for application writes for the same time.  For now, just throttle
2087 	 * here, where the rest of the code expects the receiver to sleep for
2088 	 * a while, anyways.
2089 	 */
2090 
2091 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2092 	 * this defers syncer requests for some time, before letting at least
2093 	 * on request through.  The resync controller on the receiving side
2094 	 * will adapt to the incoming rate accordingly.
2095 	 *
2096 	 * We cannot throttle here if remote is Primary/SyncTarget:
2097 	 * we would also throttle its application reads.
2098 	 * In that case, throttling is done on the SyncTarget only.
2099 	 */
2100 	if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2101 		schedule_timeout_uninterruptible(HZ/10);
2102 	if (drbd_rs_begin_io(mdev, sector))
2103 		goto out_free_e;
2104 
2105 submit_for_resync:
2106 	atomic_add(size >> 9, &mdev->rs_sect_ev);
2107 
2108 submit:
2109 	inc_unacked(mdev);
2110 	spin_lock_irq(&mdev->req_lock);
2111 	list_add_tail(&e->w.list, &mdev->read_ee);
2112 	spin_unlock_irq(&mdev->req_lock);
2113 
2114 	if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2115 		return true;
2116 
2117 	/* don't care for the reason here */
2118 	dev_err(DEV, "submit failed, triggering re-connect\n");
2119 	spin_lock_irq(&mdev->req_lock);
2120 	list_del(&e->w.list);
2121 	spin_unlock_irq(&mdev->req_lock);
2122 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2123 
2124 out_free_e:
2125 	put_ldev(mdev);
2126 	drbd_free_ee(mdev, e);
2127 	return false;
2128 }
2129 
drbd_asb_recover_0p(struct drbd_conf * mdev)2130 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2131 {
2132 	int self, peer, rv = -100;
2133 	unsigned long ch_self, ch_peer;
2134 
2135 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2136 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2137 
2138 	ch_peer = mdev->p_uuid[UI_SIZE];
2139 	ch_self = mdev->comm_bm_set;
2140 
2141 	switch (mdev->net_conf->after_sb_0p) {
2142 	case ASB_CONSENSUS:
2143 	case ASB_DISCARD_SECONDARY:
2144 	case ASB_CALL_HELPER:
2145 		dev_err(DEV, "Configuration error.\n");
2146 		break;
2147 	case ASB_DISCONNECT:
2148 		break;
2149 	case ASB_DISCARD_YOUNGER_PRI:
2150 		if (self == 0 && peer == 1) {
2151 			rv = -1;
2152 			break;
2153 		}
2154 		if (self == 1 && peer == 0) {
2155 			rv =  1;
2156 			break;
2157 		}
2158 		/* Else fall through to one of the other strategies... */
2159 	case ASB_DISCARD_OLDER_PRI:
2160 		if (self == 0 && peer == 1) {
2161 			rv = 1;
2162 			break;
2163 		}
2164 		if (self == 1 && peer == 0) {
2165 			rv = -1;
2166 			break;
2167 		}
2168 		/* Else fall through to one of the other strategies... */
2169 		dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2170 		     "Using discard-least-changes instead\n");
2171 	case ASB_DISCARD_ZERO_CHG:
2172 		if (ch_peer == 0 && ch_self == 0) {
2173 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2174 				? -1 : 1;
2175 			break;
2176 		} else {
2177 			if (ch_peer == 0) { rv =  1; break; }
2178 			if (ch_self == 0) { rv = -1; break; }
2179 		}
2180 		if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2181 			break;
2182 	case ASB_DISCARD_LEAST_CHG:
2183 		if	(ch_self < ch_peer)
2184 			rv = -1;
2185 		else if (ch_self > ch_peer)
2186 			rv =  1;
2187 		else /* ( ch_self == ch_peer ) */
2188 		     /* Well, then use something else. */
2189 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2190 				? -1 : 1;
2191 		break;
2192 	case ASB_DISCARD_LOCAL:
2193 		rv = -1;
2194 		break;
2195 	case ASB_DISCARD_REMOTE:
2196 		rv =  1;
2197 	}
2198 
2199 	return rv;
2200 }
2201 
drbd_asb_recover_1p(struct drbd_conf * mdev)2202 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2203 {
2204 	int hg, rv = -100;
2205 
2206 	switch (mdev->net_conf->after_sb_1p) {
2207 	case ASB_DISCARD_YOUNGER_PRI:
2208 	case ASB_DISCARD_OLDER_PRI:
2209 	case ASB_DISCARD_LEAST_CHG:
2210 	case ASB_DISCARD_LOCAL:
2211 	case ASB_DISCARD_REMOTE:
2212 		dev_err(DEV, "Configuration error.\n");
2213 		break;
2214 	case ASB_DISCONNECT:
2215 		break;
2216 	case ASB_CONSENSUS:
2217 		hg = drbd_asb_recover_0p(mdev);
2218 		if (hg == -1 && mdev->state.role == R_SECONDARY)
2219 			rv = hg;
2220 		if (hg == 1  && mdev->state.role == R_PRIMARY)
2221 			rv = hg;
2222 		break;
2223 	case ASB_VIOLENTLY:
2224 		rv = drbd_asb_recover_0p(mdev);
2225 		break;
2226 	case ASB_DISCARD_SECONDARY:
2227 		return mdev->state.role == R_PRIMARY ? 1 : -1;
2228 	case ASB_CALL_HELPER:
2229 		hg = drbd_asb_recover_0p(mdev);
2230 		if (hg == -1 && mdev->state.role == R_PRIMARY) {
2231 			enum drbd_state_rv rv2;
2232 
2233 			drbd_set_role(mdev, R_SECONDARY, 0);
2234 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2235 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2236 			  * we do not need to wait for the after state change work either. */
2237 			rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2238 			if (rv2 != SS_SUCCESS) {
2239 				drbd_khelper(mdev, "pri-lost-after-sb");
2240 			} else {
2241 				dev_warn(DEV, "Successfully gave up primary role.\n");
2242 				rv = hg;
2243 			}
2244 		} else
2245 			rv = hg;
2246 	}
2247 
2248 	return rv;
2249 }
2250 
drbd_asb_recover_2p(struct drbd_conf * mdev)2251 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2252 {
2253 	int hg, rv = -100;
2254 
2255 	switch (mdev->net_conf->after_sb_2p) {
2256 	case ASB_DISCARD_YOUNGER_PRI:
2257 	case ASB_DISCARD_OLDER_PRI:
2258 	case ASB_DISCARD_LEAST_CHG:
2259 	case ASB_DISCARD_LOCAL:
2260 	case ASB_DISCARD_REMOTE:
2261 	case ASB_CONSENSUS:
2262 	case ASB_DISCARD_SECONDARY:
2263 		dev_err(DEV, "Configuration error.\n");
2264 		break;
2265 	case ASB_VIOLENTLY:
2266 		rv = drbd_asb_recover_0p(mdev);
2267 		break;
2268 	case ASB_DISCONNECT:
2269 		break;
2270 	case ASB_CALL_HELPER:
2271 		hg = drbd_asb_recover_0p(mdev);
2272 		if (hg == -1) {
2273 			enum drbd_state_rv rv2;
2274 
2275 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2276 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2277 			  * we do not need to wait for the after state change work either. */
2278 			rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2279 			if (rv2 != SS_SUCCESS) {
2280 				drbd_khelper(mdev, "pri-lost-after-sb");
2281 			} else {
2282 				dev_warn(DEV, "Successfully gave up primary role.\n");
2283 				rv = hg;
2284 			}
2285 		} else
2286 			rv = hg;
2287 	}
2288 
2289 	return rv;
2290 }
2291 
drbd_uuid_dump(struct drbd_conf * mdev,char * text,u64 * uuid,u64 bits,u64 flags)2292 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2293 			   u64 bits, u64 flags)
2294 {
2295 	if (!uuid) {
2296 		dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2297 		return;
2298 	}
2299 	dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2300 	     text,
2301 	     (unsigned long long)uuid[UI_CURRENT],
2302 	     (unsigned long long)uuid[UI_BITMAP],
2303 	     (unsigned long long)uuid[UI_HISTORY_START],
2304 	     (unsigned long long)uuid[UI_HISTORY_END],
2305 	     (unsigned long long)bits,
2306 	     (unsigned long long)flags);
2307 }
2308 
2309 /*
2310   100	after split brain try auto recover
2311     2	C_SYNC_SOURCE set BitMap
2312     1	C_SYNC_SOURCE use BitMap
2313     0	no Sync
2314    -1	C_SYNC_TARGET use BitMap
2315    -2	C_SYNC_TARGET set BitMap
2316  -100	after split brain, disconnect
2317 -1000	unrelated data
2318 -1091   requires proto 91
2319 -1096   requires proto 96
2320  */
drbd_uuid_compare(struct drbd_conf * mdev,int * rule_nr)2321 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2322 {
2323 	u64 self, peer;
2324 	int i, j;
2325 
2326 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2327 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2328 
2329 	*rule_nr = 10;
2330 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2331 		return 0;
2332 
2333 	*rule_nr = 20;
2334 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2335 	     peer != UUID_JUST_CREATED)
2336 		return -2;
2337 
2338 	*rule_nr = 30;
2339 	if (self != UUID_JUST_CREATED &&
2340 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2341 		return 2;
2342 
2343 	if (self == peer) {
2344 		int rct, dc; /* roles at crash time */
2345 
2346 		if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2347 
2348 			if (mdev->agreed_pro_version < 91)
2349 				return -1091;
2350 
2351 			if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2352 			    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2353 				dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2354 				drbd_uuid_set_bm(mdev, 0UL);
2355 
2356 				drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2357 					       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2358 				*rule_nr = 34;
2359 			} else {
2360 				dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2361 				*rule_nr = 36;
2362 			}
2363 
2364 			return 1;
2365 		}
2366 
2367 		if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2368 
2369 			if (mdev->agreed_pro_version < 91)
2370 				return -1091;
2371 
2372 			if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2373 			    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2374 				dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2375 
2376 				mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2377 				mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2378 				mdev->p_uuid[UI_BITMAP] = 0UL;
2379 
2380 				drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2381 				*rule_nr = 35;
2382 			} else {
2383 				dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2384 				*rule_nr = 37;
2385 			}
2386 
2387 			return -1;
2388 		}
2389 
2390 		/* Common power [off|failure] */
2391 		rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2392 			(mdev->p_uuid[UI_FLAGS] & 2);
2393 		/* lowest bit is set when we were primary,
2394 		 * next bit (weight 2) is set when peer was primary */
2395 		*rule_nr = 40;
2396 
2397 		switch (rct) {
2398 		case 0: /* !self_pri && !peer_pri */ return 0;
2399 		case 1: /*  self_pri && !peer_pri */ return 1;
2400 		case 2: /* !self_pri &&  peer_pri */ return -1;
2401 		case 3: /*  self_pri &&  peer_pri */
2402 			dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2403 			return dc ? -1 : 1;
2404 		}
2405 	}
2406 
2407 	*rule_nr = 50;
2408 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2409 	if (self == peer)
2410 		return -1;
2411 
2412 	*rule_nr = 51;
2413 	peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2414 	if (self == peer) {
2415 		if (mdev->agreed_pro_version < 96 ?
2416 		    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2417 		    (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2418 		    peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2419 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2420 			   resync as sync source modifications of the peer's UUIDs. */
2421 
2422 			if (mdev->agreed_pro_version < 91)
2423 				return -1091;
2424 
2425 			mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2426 			mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2427 
2428 			dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2429 			drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2430 
2431 			return -1;
2432 		}
2433 	}
2434 
2435 	*rule_nr = 60;
2436 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2437 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2438 		peer = mdev->p_uuid[i] & ~((u64)1);
2439 		if (self == peer)
2440 			return -2;
2441 	}
2442 
2443 	*rule_nr = 70;
2444 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2445 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2446 	if (self == peer)
2447 		return 1;
2448 
2449 	*rule_nr = 71;
2450 	self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2451 	if (self == peer) {
2452 		if (mdev->agreed_pro_version < 96 ?
2453 		    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2454 		    (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2455 		    self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2456 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2457 			   resync as sync source modifications of our UUIDs. */
2458 
2459 			if (mdev->agreed_pro_version < 91)
2460 				return -1091;
2461 
2462 			_drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2463 			_drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2464 
2465 			dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2466 			drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2467 				       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2468 
2469 			return 1;
2470 		}
2471 	}
2472 
2473 
2474 	*rule_nr = 80;
2475 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2476 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2477 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2478 		if (self == peer)
2479 			return 2;
2480 	}
2481 
2482 	*rule_nr = 90;
2483 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2484 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2485 	if (self == peer && self != ((u64)0))
2486 		return 100;
2487 
2488 	*rule_nr = 100;
2489 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2490 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2491 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2492 			peer = mdev->p_uuid[j] & ~((u64)1);
2493 			if (self == peer)
2494 				return -100;
2495 		}
2496 	}
2497 
2498 	return -1000;
2499 }
2500 
2501 /* drbd_sync_handshake() returns the new conn state on success, or
2502    CONN_MASK (-1) on failure.
2503  */
drbd_sync_handshake(struct drbd_conf * mdev,enum drbd_role peer_role,enum drbd_disk_state peer_disk)2504 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2505 					   enum drbd_disk_state peer_disk) __must_hold(local)
2506 {
2507 	int hg, rule_nr;
2508 	enum drbd_conns rv = C_MASK;
2509 	enum drbd_disk_state mydisk;
2510 
2511 	mydisk = mdev->state.disk;
2512 	if (mydisk == D_NEGOTIATING)
2513 		mydisk = mdev->new_state_tmp.disk;
2514 
2515 	dev_info(DEV, "drbd_sync_handshake:\n");
2516 	drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2517 	drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2518 		       mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2519 
2520 	hg = drbd_uuid_compare(mdev, &rule_nr);
2521 
2522 	dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2523 
2524 	if (hg == -1000) {
2525 		dev_alert(DEV, "Unrelated data, aborting!\n");
2526 		return C_MASK;
2527 	}
2528 	if (hg < -1000) {
2529 		dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2530 		return C_MASK;
2531 	}
2532 
2533 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2534 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2535 		int f = (hg == -100) || abs(hg) == 2;
2536 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
2537 		if (f)
2538 			hg = hg*2;
2539 		dev_info(DEV, "Becoming sync %s due to disk states.\n",
2540 		     hg > 0 ? "source" : "target");
2541 	}
2542 
2543 	if (abs(hg) == 100)
2544 		drbd_khelper(mdev, "initial-split-brain");
2545 
2546 	if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2547 		int pcount = (mdev->state.role == R_PRIMARY)
2548 			   + (peer_role == R_PRIMARY);
2549 		int forced = (hg == -100);
2550 
2551 		switch (pcount) {
2552 		case 0:
2553 			hg = drbd_asb_recover_0p(mdev);
2554 			break;
2555 		case 1:
2556 			hg = drbd_asb_recover_1p(mdev);
2557 			break;
2558 		case 2:
2559 			hg = drbd_asb_recover_2p(mdev);
2560 			break;
2561 		}
2562 		if (abs(hg) < 100) {
2563 			dev_warn(DEV, "Split-Brain detected, %d primaries, "
2564 			     "automatically solved. Sync from %s node\n",
2565 			     pcount, (hg < 0) ? "peer" : "this");
2566 			if (forced) {
2567 				dev_warn(DEV, "Doing a full sync, since"
2568 				     " UUIDs where ambiguous.\n");
2569 				hg = hg*2;
2570 			}
2571 		}
2572 	}
2573 
2574 	if (hg == -100) {
2575 		if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2576 			hg = -1;
2577 		if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2578 			hg = 1;
2579 
2580 		if (abs(hg) < 100)
2581 			dev_warn(DEV, "Split-Brain detected, manually solved. "
2582 			     "Sync from %s node\n",
2583 			     (hg < 0) ? "peer" : "this");
2584 	}
2585 
2586 	if (hg == -100) {
2587 		/* FIXME this log message is not correct if we end up here
2588 		 * after an attempted attach on a diskless node.
2589 		 * We just refuse to attach -- well, we drop the "connection"
2590 		 * to that disk, in a way... */
2591 		dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2592 		drbd_khelper(mdev, "split-brain");
2593 		return C_MASK;
2594 	}
2595 
2596 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
2597 		dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2598 		return C_MASK;
2599 	}
2600 
2601 	if (hg < 0 && /* by intention we do not use mydisk here. */
2602 	    mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2603 		switch (mdev->net_conf->rr_conflict) {
2604 		case ASB_CALL_HELPER:
2605 			drbd_khelper(mdev, "pri-lost");
2606 			/* fall through */
2607 		case ASB_DISCONNECT:
2608 			dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2609 			return C_MASK;
2610 		case ASB_VIOLENTLY:
2611 			dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2612 			     "assumption\n");
2613 		}
2614 	}
2615 
2616 	if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2617 		if (hg == 0)
2618 			dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2619 		else
2620 			dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2621 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2622 				 abs(hg) >= 2 ? "full" : "bit-map based");
2623 		return C_MASK;
2624 	}
2625 
2626 	if (abs(hg) >= 2) {
2627 		dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2628 		if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2629 					BM_LOCKED_SET_ALLOWED))
2630 			return C_MASK;
2631 	}
2632 
2633 	if (hg > 0) { /* become sync source. */
2634 		rv = C_WF_BITMAP_S;
2635 	} else if (hg < 0) { /* become sync target */
2636 		rv = C_WF_BITMAP_T;
2637 	} else {
2638 		rv = C_CONNECTED;
2639 		if (drbd_bm_total_weight(mdev)) {
2640 			dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2641 			     drbd_bm_total_weight(mdev));
2642 		}
2643 	}
2644 
2645 	return rv;
2646 }
2647 
2648 /* returns 1 if invalid */
cmp_after_sb(enum drbd_after_sb_p peer,enum drbd_after_sb_p self)2649 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2650 {
2651 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2652 	if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2653 	    (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2654 		return 0;
2655 
2656 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2657 	if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2658 	    self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2659 		return 1;
2660 
2661 	/* everything else is valid if they are equal on both sides. */
2662 	if (peer == self)
2663 		return 0;
2664 
2665 	/* everything es is invalid. */
2666 	return 1;
2667 }
2668 
receive_protocol(struct drbd_conf * mdev,enum drbd_packets cmd,unsigned int data_size)2669 static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2670 {
2671 	struct p_protocol *p = &mdev->data.rbuf.protocol;
2672 	int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2673 	int p_want_lose, p_two_primaries, cf;
2674 	char p_integrity_alg[SHARED_SECRET_MAX] = "";
2675 
2676 	p_proto		= be32_to_cpu(p->protocol);
2677 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
2678 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
2679 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
2680 	p_two_primaries = be32_to_cpu(p->two_primaries);
2681 	cf		= be32_to_cpu(p->conn_flags);
2682 	p_want_lose = cf & CF_WANT_LOSE;
2683 
2684 	clear_bit(CONN_DRY_RUN, &mdev->flags);
2685 
2686 	if (cf & CF_DRY_RUN)
2687 		set_bit(CONN_DRY_RUN, &mdev->flags);
2688 
2689 	if (p_proto != mdev->net_conf->wire_protocol) {
2690 		dev_err(DEV, "incompatible communication protocols\n");
2691 		goto disconnect;
2692 	}
2693 
2694 	if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2695 		dev_err(DEV, "incompatible after-sb-0pri settings\n");
2696 		goto disconnect;
2697 	}
2698 
2699 	if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2700 		dev_err(DEV, "incompatible after-sb-1pri settings\n");
2701 		goto disconnect;
2702 	}
2703 
2704 	if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2705 		dev_err(DEV, "incompatible after-sb-2pri settings\n");
2706 		goto disconnect;
2707 	}
2708 
2709 	if (p_want_lose && mdev->net_conf->want_lose) {
2710 		dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2711 		goto disconnect;
2712 	}
2713 
2714 	if (p_two_primaries != mdev->net_conf->two_primaries) {
2715 		dev_err(DEV, "incompatible setting of the two-primaries options\n");
2716 		goto disconnect;
2717 	}
2718 
2719 	if (mdev->agreed_pro_version >= 87) {
2720 		unsigned char *my_alg = mdev->net_conf->integrity_alg;
2721 
2722 		if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2723 			return false;
2724 
2725 		p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2726 		if (strcmp(p_integrity_alg, my_alg)) {
2727 			dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2728 			goto disconnect;
2729 		}
2730 		dev_info(DEV, "data-integrity-alg: %s\n",
2731 		     my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2732 	}
2733 
2734 	return true;
2735 
2736 disconnect:
2737 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2738 	return false;
2739 }
2740 
2741 /* helper function
2742  * input: alg name, feature name
2743  * return: NULL (alg name was "")
2744  *         ERR_PTR(error) if something goes wrong
2745  *         or the crypto hash ptr, if it worked out ok. */
drbd_crypto_alloc_digest_safe(const struct drbd_conf * mdev,const char * alg,const char * name)2746 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2747 		const char *alg, const char *name)
2748 {
2749 	struct crypto_hash *tfm;
2750 
2751 	if (!alg[0])
2752 		return NULL;
2753 
2754 	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2755 	if (IS_ERR(tfm)) {
2756 		dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2757 			alg, name, PTR_ERR(tfm));
2758 		return tfm;
2759 	}
2760 	if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2761 		crypto_free_hash(tfm);
2762 		dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2763 		return ERR_PTR(-EINVAL);
2764 	}
2765 	return tfm;
2766 }
2767 
receive_SyncParam(struct drbd_conf * mdev,enum drbd_packets cmd,unsigned int packet_size)2768 static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
2769 {
2770 	int ok = true;
2771 	struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
2772 	unsigned int header_size, data_size, exp_max_sz;
2773 	struct crypto_hash *verify_tfm = NULL;
2774 	struct crypto_hash *csums_tfm = NULL;
2775 	const int apv = mdev->agreed_pro_version;
2776 	int *rs_plan_s = NULL;
2777 	int fifo_size = 0;
2778 
2779 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2780 		    : apv == 88 ? sizeof(struct p_rs_param)
2781 					+ SHARED_SECRET_MAX
2782 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
2783 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2784 
2785 	if (packet_size > exp_max_sz) {
2786 		dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2787 		    packet_size, exp_max_sz);
2788 		return false;
2789 	}
2790 
2791 	if (apv <= 88) {
2792 		header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2793 		data_size   = packet_size  - header_size;
2794 	} else if (apv <= 94) {
2795 		header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2796 		data_size   = packet_size  - header_size;
2797 		D_ASSERT(data_size == 0);
2798 	} else {
2799 		header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2800 		data_size   = packet_size  - header_size;
2801 		D_ASSERT(data_size == 0);
2802 	}
2803 
2804 	/* initialize verify_alg and csums_alg */
2805 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2806 
2807 	if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
2808 		return false;
2809 
2810 	mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2811 
2812 	if (apv >= 88) {
2813 		if (apv == 88) {
2814 			if (data_size > SHARED_SECRET_MAX) {
2815 				dev_err(DEV, "verify-alg too long, "
2816 				    "peer wants %u, accepting only %u byte\n",
2817 						data_size, SHARED_SECRET_MAX);
2818 				return false;
2819 			}
2820 
2821 			if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2822 				return false;
2823 
2824 			/* we expect NUL terminated string */
2825 			/* but just in case someone tries to be evil */
2826 			D_ASSERT(p->verify_alg[data_size-1] == 0);
2827 			p->verify_alg[data_size-1] = 0;
2828 
2829 		} else /* apv >= 89 */ {
2830 			/* we still expect NUL terminated strings */
2831 			/* but just in case someone tries to be evil */
2832 			D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2833 			D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2834 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2835 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2836 		}
2837 
2838 		if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2839 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2840 				dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2841 				    mdev->sync_conf.verify_alg, p->verify_alg);
2842 				goto disconnect;
2843 			}
2844 			verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2845 					p->verify_alg, "verify-alg");
2846 			if (IS_ERR(verify_tfm)) {
2847 				verify_tfm = NULL;
2848 				goto disconnect;
2849 			}
2850 		}
2851 
2852 		if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2853 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2854 				dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2855 				    mdev->sync_conf.csums_alg, p->csums_alg);
2856 				goto disconnect;
2857 			}
2858 			csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2859 					p->csums_alg, "csums-alg");
2860 			if (IS_ERR(csums_tfm)) {
2861 				csums_tfm = NULL;
2862 				goto disconnect;
2863 			}
2864 		}
2865 
2866 		if (apv > 94) {
2867 			mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2868 			mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2869 			mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2870 			mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2871 			mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2872 
2873 			fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2874 			if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2875 				rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2876 				if (!rs_plan_s) {
2877 					dev_err(DEV, "kmalloc of fifo_buffer failed");
2878 					goto disconnect;
2879 				}
2880 			}
2881 		}
2882 
2883 		spin_lock(&mdev->peer_seq_lock);
2884 		/* lock against drbd_nl_syncer_conf() */
2885 		if (verify_tfm) {
2886 			strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2887 			mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2888 			crypto_free_hash(mdev->verify_tfm);
2889 			mdev->verify_tfm = verify_tfm;
2890 			dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2891 		}
2892 		if (csums_tfm) {
2893 			strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2894 			mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2895 			crypto_free_hash(mdev->csums_tfm);
2896 			mdev->csums_tfm = csums_tfm;
2897 			dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2898 		}
2899 		if (fifo_size != mdev->rs_plan_s.size) {
2900 			kfree(mdev->rs_plan_s.values);
2901 			mdev->rs_plan_s.values = rs_plan_s;
2902 			mdev->rs_plan_s.size   = fifo_size;
2903 			mdev->rs_planed = 0;
2904 		}
2905 		spin_unlock(&mdev->peer_seq_lock);
2906 	}
2907 
2908 	return ok;
2909 disconnect:
2910 	/* just for completeness: actually not needed,
2911 	 * as this is not reached if csums_tfm was ok. */
2912 	crypto_free_hash(csums_tfm);
2913 	/* but free the verify_tfm again, if csums_tfm did not work out */
2914 	crypto_free_hash(verify_tfm);
2915 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2916 	return false;
2917 }
2918 
drbd_setup_order_type(struct drbd_conf * mdev,int peer)2919 static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2920 {
2921 	/* sorry, we currently have no working implementation
2922 	 * of distributed TCQ */
2923 }
2924 
2925 /* warn if the arguments differ by more than 12.5% */
warn_if_differ_considerably(struct drbd_conf * mdev,const char * s,sector_t a,sector_t b)2926 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2927 	const char *s, sector_t a, sector_t b)
2928 {
2929 	sector_t d;
2930 	if (a == 0 || b == 0)
2931 		return;
2932 	d = (a > b) ? (a - b) : (b - a);
2933 	if (d > (a>>3) || d > (b>>3))
2934 		dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2935 		     (unsigned long long)a, (unsigned long long)b);
2936 }
2937 
receive_sizes(struct drbd_conf * mdev,enum drbd_packets cmd,unsigned int data_size)2938 static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2939 {
2940 	struct p_sizes *p = &mdev->data.rbuf.sizes;
2941 	enum determine_dev_size dd = unchanged;
2942 	unsigned int max_bio_size;
2943 	sector_t p_size, p_usize, my_usize;
2944 	int ldsc = 0; /* local disk size changed */
2945 	enum dds_flags ddsf;
2946 
2947 	p_size = be64_to_cpu(p->d_size);
2948 	p_usize = be64_to_cpu(p->u_size);
2949 
2950 	if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2951 		dev_err(DEV, "some backing storage is needed\n");
2952 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2953 		return false;
2954 	}
2955 
2956 	/* just store the peer's disk size for now.
2957 	 * we still need to figure out whether we accept that. */
2958 	mdev->p_size = p_size;
2959 
2960 	if (get_ldev(mdev)) {
2961 		warn_if_differ_considerably(mdev, "lower level device sizes",
2962 			   p_size, drbd_get_max_capacity(mdev->ldev));
2963 		warn_if_differ_considerably(mdev, "user requested size",
2964 					    p_usize, mdev->ldev->dc.disk_size);
2965 
2966 		/* if this is the first connect, or an otherwise expected
2967 		 * param exchange, choose the minimum */
2968 		if (mdev->state.conn == C_WF_REPORT_PARAMS)
2969 			p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2970 					     p_usize);
2971 
2972 		my_usize = mdev->ldev->dc.disk_size;
2973 
2974 		if (mdev->ldev->dc.disk_size != p_usize) {
2975 			mdev->ldev->dc.disk_size = p_usize;
2976 			dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2977 			     (unsigned long)mdev->ldev->dc.disk_size);
2978 		}
2979 
2980 		/* Never shrink a device with usable data during connect.
2981 		   But allow online shrinking if we are connected. */
2982 		if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
2983 		   drbd_get_capacity(mdev->this_bdev) &&
2984 		   mdev->state.disk >= D_OUTDATED &&
2985 		   mdev->state.conn < C_CONNECTED) {
2986 			dev_err(DEV, "The peer's disk size is too small!\n");
2987 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2988 			mdev->ldev->dc.disk_size = my_usize;
2989 			put_ldev(mdev);
2990 			return false;
2991 		}
2992 		put_ldev(mdev);
2993 	}
2994 
2995 	ddsf = be16_to_cpu(p->dds_flags);
2996 	if (get_ldev(mdev)) {
2997 		dd = drbd_determin_dev_size(mdev, ddsf);
2998 		put_ldev(mdev);
2999 		if (dd == dev_size_error)
3000 			return false;
3001 		drbd_md_sync(mdev);
3002 	} else {
3003 		/* I am diskless, need to accept the peer's size. */
3004 		drbd_set_my_capacity(mdev, p_size);
3005 	}
3006 
3007 	if (get_ldev(mdev)) {
3008 		if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3009 			mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3010 			ldsc = 1;
3011 		}
3012 
3013 		if (mdev->agreed_pro_version < 94)
3014 			max_bio_size = be32_to_cpu(p->max_bio_size);
3015 		else if (mdev->agreed_pro_version == 94)
3016 			max_bio_size = DRBD_MAX_SIZE_H80_PACKET;
3017 		else /* drbd 8.3.8 onwards */
3018 			max_bio_size = DRBD_MAX_BIO_SIZE;
3019 
3020 		if (max_bio_size != queue_max_hw_sectors(mdev->rq_queue) << 9)
3021 			drbd_setup_queue_param(mdev, max_bio_size);
3022 
3023 		drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
3024 		put_ldev(mdev);
3025 	}
3026 
3027 	if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3028 		if (be64_to_cpu(p->c_size) !=
3029 		    drbd_get_capacity(mdev->this_bdev) || ldsc) {
3030 			/* we have different sizes, probably peer
3031 			 * needs to know my new size... */
3032 			drbd_send_sizes(mdev, 0, ddsf);
3033 		}
3034 		if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3035 		    (dd == grew && mdev->state.conn == C_CONNECTED)) {
3036 			if (mdev->state.pdsk >= D_INCONSISTENT &&
3037 			    mdev->state.disk >= D_INCONSISTENT) {
3038 				if (ddsf & DDSF_NO_RESYNC)
3039 					dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3040 				else
3041 					resync_after_online_grow(mdev);
3042 			} else
3043 				set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3044 		}
3045 	}
3046 
3047 	return true;
3048 }
3049 
receive_uuids(struct drbd_conf * mdev,enum drbd_packets cmd,unsigned int data_size)3050 static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3051 {
3052 	struct p_uuids *p = &mdev->data.rbuf.uuids;
3053 	u64 *p_uuid;
3054 	int i, updated_uuids = 0;
3055 
3056 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3057 
3058 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3059 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
3060 
3061 	kfree(mdev->p_uuid);
3062 	mdev->p_uuid = p_uuid;
3063 
3064 	if (mdev->state.conn < C_CONNECTED &&
3065 	    mdev->state.disk < D_INCONSISTENT &&
3066 	    mdev->state.role == R_PRIMARY &&
3067 	    (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3068 		dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3069 		    (unsigned long long)mdev->ed_uuid);
3070 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3071 		return false;
3072 	}
3073 
3074 	if (get_ldev(mdev)) {
3075 		int skip_initial_sync =
3076 			mdev->state.conn == C_CONNECTED &&
3077 			mdev->agreed_pro_version >= 90 &&
3078 			mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3079 			(p_uuid[UI_FLAGS] & 8);
3080 		if (skip_initial_sync) {
3081 			dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3082 			drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3083 					"clear_n_write from receive_uuids",
3084 					BM_LOCKED_TEST_ALLOWED);
3085 			_drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3086 			_drbd_uuid_set(mdev, UI_BITMAP, 0);
3087 			_drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3088 					CS_VERBOSE, NULL);
3089 			drbd_md_sync(mdev);
3090 			updated_uuids = 1;
3091 		}
3092 		put_ldev(mdev);
3093 	} else if (mdev->state.disk < D_INCONSISTENT &&
3094 		   mdev->state.role == R_PRIMARY) {
3095 		/* I am a diskless primary, the peer just created a new current UUID
3096 		   for me. */
3097 		updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3098 	}
3099 
3100 	/* Before we test for the disk state, we should wait until an eventually
3101 	   ongoing cluster wide state change is finished. That is important if
3102 	   we are primary and are detaching from our disk. We need to see the
3103 	   new disk state... */
3104 	wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3105 	if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3106 		updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3107 
3108 	if (updated_uuids)
3109 		drbd_print_uuids(mdev, "receiver updated UUIDs to");
3110 
3111 	return true;
3112 }
3113 
3114 /**
3115  * convert_state() - Converts the peer's view of the cluster state to our point of view
3116  * @ps:		The state as seen by the peer.
3117  */
convert_state(union drbd_state ps)3118 static union drbd_state convert_state(union drbd_state ps)
3119 {
3120 	union drbd_state ms;
3121 
3122 	static enum drbd_conns c_tab[] = {
3123 		[C_CONNECTED] = C_CONNECTED,
3124 
3125 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3126 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3127 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3128 		[C_VERIFY_S]       = C_VERIFY_T,
3129 		[C_MASK]   = C_MASK,
3130 	};
3131 
3132 	ms.i = ps.i;
3133 
3134 	ms.conn = c_tab[ps.conn];
3135 	ms.peer = ps.role;
3136 	ms.role = ps.peer;
3137 	ms.pdsk = ps.disk;
3138 	ms.disk = ps.pdsk;
3139 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3140 
3141 	return ms;
3142 }
3143 
receive_req_state(struct drbd_conf * mdev,enum drbd_packets cmd,unsigned int data_size)3144 static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3145 {
3146 	struct p_req_state *p = &mdev->data.rbuf.req_state;
3147 	union drbd_state mask, val;
3148 	enum drbd_state_rv rv;
3149 
3150 	mask.i = be32_to_cpu(p->mask);
3151 	val.i = be32_to_cpu(p->val);
3152 
3153 	if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3154 	    test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3155 		drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3156 		return true;
3157 	}
3158 
3159 	mask = convert_state(mask);
3160 	val = convert_state(val);
3161 
3162 	rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3163 
3164 	drbd_send_sr_reply(mdev, rv);
3165 	drbd_md_sync(mdev);
3166 
3167 	return true;
3168 }
3169 
receive_state(struct drbd_conf * mdev,enum drbd_packets cmd,unsigned int data_size)3170 static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3171 {
3172 	struct p_state *p = &mdev->data.rbuf.state;
3173 	union drbd_state os, ns, peer_state;
3174 	enum drbd_disk_state real_peer_disk;
3175 	enum chg_state_flags cs_flags;
3176 	int rv;
3177 
3178 	peer_state.i = be32_to_cpu(p->state);
3179 
3180 	real_peer_disk = peer_state.disk;
3181 	if (peer_state.disk == D_NEGOTIATING) {
3182 		real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3183 		dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3184 	}
3185 
3186 	spin_lock_irq(&mdev->req_lock);
3187  retry:
3188 	os = ns = mdev->state;
3189 	spin_unlock_irq(&mdev->req_lock);
3190 
3191 	/* peer says his disk is uptodate, while we think it is inconsistent,
3192 	 * and this happens while we think we have a sync going on. */
3193 	if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3194 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3195 		/* If we are (becoming) SyncSource, but peer is still in sync
3196 		 * preparation, ignore its uptodate-ness to avoid flapping, it
3197 		 * will change to inconsistent once the peer reaches active
3198 		 * syncing states.
3199 		 * It may have changed syncer-paused flags, however, so we
3200 		 * cannot ignore this completely. */
3201 		if (peer_state.conn > C_CONNECTED &&
3202 		    peer_state.conn < C_SYNC_SOURCE)
3203 			real_peer_disk = D_INCONSISTENT;
3204 
3205 		/* if peer_state changes to connected at the same time,
3206 		 * it explicitly notifies us that it finished resync.
3207 		 * Maybe we should finish it up, too? */
3208 		else if (os.conn >= C_SYNC_SOURCE &&
3209 			 peer_state.conn == C_CONNECTED) {
3210 			if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3211 				drbd_resync_finished(mdev);
3212 			return true;
3213 		}
3214 	}
3215 
3216 	/* peer says his disk is inconsistent, while we think it is uptodate,
3217 	 * and this happens while the peer still thinks we have a sync going on,
3218 	 * but we think we are already done with the sync.
3219 	 * We ignore this to avoid flapping pdsk.
3220 	 * This should not happen, if the peer is a recent version of drbd. */
3221 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3222 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3223 		real_peer_disk = D_UP_TO_DATE;
3224 
3225 	if (ns.conn == C_WF_REPORT_PARAMS)
3226 		ns.conn = C_CONNECTED;
3227 
3228 	if (peer_state.conn == C_AHEAD)
3229 		ns.conn = C_BEHIND;
3230 
3231 	if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3232 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
3233 		int cr; /* consider resync */
3234 
3235 		/* if we established a new connection */
3236 		cr  = (os.conn < C_CONNECTED);
3237 		/* if we had an established connection
3238 		 * and one of the nodes newly attaches a disk */
3239 		cr |= (os.conn == C_CONNECTED &&
3240 		       (peer_state.disk == D_NEGOTIATING ||
3241 			os.disk == D_NEGOTIATING));
3242 		/* if we have both been inconsistent, and the peer has been
3243 		 * forced to be UpToDate with --overwrite-data */
3244 		cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3245 		/* if we had been plain connected, and the admin requested to
3246 		 * start a sync by "invalidate" or "invalidate-remote" */
3247 		cr |= (os.conn == C_CONNECTED &&
3248 				(peer_state.conn >= C_STARTING_SYNC_S &&
3249 				 peer_state.conn <= C_WF_BITMAP_T));
3250 
3251 		if (cr)
3252 			ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3253 
3254 		put_ldev(mdev);
3255 		if (ns.conn == C_MASK) {
3256 			ns.conn = C_CONNECTED;
3257 			if (mdev->state.disk == D_NEGOTIATING) {
3258 				drbd_force_state(mdev, NS(disk, D_FAILED));
3259 			} else if (peer_state.disk == D_NEGOTIATING) {
3260 				dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3261 				peer_state.disk = D_DISKLESS;
3262 				real_peer_disk = D_DISKLESS;
3263 			} else {
3264 				if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3265 					return false;
3266 				D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3267 				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3268 				return false;
3269 			}
3270 		}
3271 	}
3272 
3273 	spin_lock_irq(&mdev->req_lock);
3274 	if (mdev->state.i != os.i)
3275 		goto retry;
3276 	clear_bit(CONSIDER_RESYNC, &mdev->flags);
3277 	ns.peer = peer_state.role;
3278 	ns.pdsk = real_peer_disk;
3279 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3280 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3281 		ns.disk = mdev->new_state_tmp.disk;
3282 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3283 	if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3284 	    test_bit(NEW_CUR_UUID, &mdev->flags)) {
3285 		/* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3286 		   for temporal network outages! */
3287 		spin_unlock_irq(&mdev->req_lock);
3288 		dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3289 		tl_clear(mdev);
3290 		drbd_uuid_new_current(mdev);
3291 		clear_bit(NEW_CUR_UUID, &mdev->flags);
3292 		drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3293 		return false;
3294 	}
3295 	rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3296 	ns = mdev->state;
3297 	spin_unlock_irq(&mdev->req_lock);
3298 
3299 	if (rv < SS_SUCCESS) {
3300 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3301 		return false;
3302 	}
3303 
3304 	if (os.conn > C_WF_REPORT_PARAMS) {
3305 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3306 		    peer_state.disk != D_NEGOTIATING ) {
3307 			/* we want resync, peer has not yet decided to sync... */
3308 			/* Nowadays only used when forcing a node into primary role and
3309 			   setting its disk to UpToDate with that */
3310 			drbd_send_uuids(mdev);
3311 			drbd_send_state(mdev);
3312 		}
3313 	}
3314 
3315 	mdev->net_conf->want_lose = 0;
3316 
3317 	drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3318 
3319 	return true;
3320 }
3321 
receive_sync_uuid(struct drbd_conf * mdev,enum drbd_packets cmd,unsigned int data_size)3322 static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3323 {
3324 	struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
3325 
3326 	wait_event(mdev->misc_wait,
3327 		   mdev->state.conn == C_WF_SYNC_UUID ||
3328 		   mdev->state.conn == C_BEHIND ||
3329 		   mdev->state.conn < C_CONNECTED ||
3330 		   mdev->state.disk < D_NEGOTIATING);
3331 
3332 	/* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3333 
3334 	/* Here the _drbd_uuid_ functions are right, current should
3335 	   _not_ be rotated into the history */
3336 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3337 		_drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3338 		_drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3339 
3340 		drbd_print_uuids(mdev, "updated sync uuid");
3341 		drbd_start_resync(mdev, C_SYNC_TARGET);
3342 
3343 		put_ldev(mdev);
3344 	} else
3345 		dev_err(DEV, "Ignoring SyncUUID packet!\n");
3346 
3347 	return true;
3348 }
3349 
3350 /**
3351  * receive_bitmap_plain
3352  *
3353  * Return 0 when done, 1 when another iteration is needed, and a negative error
3354  * code upon failure.
3355  */
3356 static int
receive_bitmap_plain(struct drbd_conf * mdev,unsigned int data_size,unsigned long * buffer,struct bm_xfer_ctx * c)3357 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3358 		     unsigned long *buffer, struct bm_xfer_ctx *c)
3359 {
3360 	unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3361 	unsigned want = num_words * sizeof(long);
3362 	int err;
3363 
3364 	if (want != data_size) {
3365 		dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3366 		return -EIO;
3367 	}
3368 	if (want == 0)
3369 		return 0;
3370 	err = drbd_recv(mdev, buffer, want);
3371 	if (err != want) {
3372 		if (err >= 0)
3373 			err = -EIO;
3374 		return err;
3375 	}
3376 
3377 	drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3378 
3379 	c->word_offset += num_words;
3380 	c->bit_offset = c->word_offset * BITS_PER_LONG;
3381 	if (c->bit_offset > c->bm_bits)
3382 		c->bit_offset = c->bm_bits;
3383 
3384 	return 1;
3385 }
3386 
3387 /**
3388  * recv_bm_rle_bits
3389  *
3390  * Return 0 when done, 1 when another iteration is needed, and a negative error
3391  * code upon failure.
3392  */
3393 static int
recv_bm_rle_bits(struct drbd_conf * mdev,struct p_compressed_bm * p,struct bm_xfer_ctx * c)3394 recv_bm_rle_bits(struct drbd_conf *mdev,
3395 		struct p_compressed_bm *p,
3396 		struct bm_xfer_ctx *c)
3397 {
3398 	struct bitstream bs;
3399 	u64 look_ahead;
3400 	u64 rl;
3401 	u64 tmp;
3402 	unsigned long s = c->bit_offset;
3403 	unsigned long e;
3404 	int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
3405 	int toggle = DCBP_get_start(p);
3406 	int have;
3407 	int bits;
3408 
3409 	bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3410 
3411 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
3412 	if (bits < 0)
3413 		return -EIO;
3414 
3415 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
3416 		bits = vli_decode_bits(&rl, look_ahead);
3417 		if (bits <= 0)
3418 			return -EIO;
3419 
3420 		if (toggle) {
3421 			e = s + rl -1;
3422 			if (e >= c->bm_bits) {
3423 				dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3424 				return -EIO;
3425 			}
3426 			_drbd_bm_set_bits(mdev, s, e);
3427 		}
3428 
3429 		if (have < bits) {
3430 			dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3431 				have, bits, look_ahead,
3432 				(unsigned int)(bs.cur.b - p->code),
3433 				(unsigned int)bs.buf_len);
3434 			return -EIO;
3435 		}
3436 		look_ahead >>= bits;
3437 		have -= bits;
3438 
3439 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3440 		if (bits < 0)
3441 			return -EIO;
3442 		look_ahead |= tmp << have;
3443 		have += bits;
3444 	}
3445 
3446 	c->bit_offset = s;
3447 	bm_xfer_ctx_bit_to_word_offset(c);
3448 
3449 	return (s != c->bm_bits);
3450 }
3451 
3452 /**
3453  * decode_bitmap_c
3454  *
3455  * Return 0 when done, 1 when another iteration is needed, and a negative error
3456  * code upon failure.
3457  */
3458 static int
decode_bitmap_c(struct drbd_conf * mdev,struct p_compressed_bm * p,struct bm_xfer_ctx * c)3459 decode_bitmap_c(struct drbd_conf *mdev,
3460 		struct p_compressed_bm *p,
3461 		struct bm_xfer_ctx *c)
3462 {
3463 	if (DCBP_get_code(p) == RLE_VLI_Bits)
3464 		return recv_bm_rle_bits(mdev, p, c);
3465 
3466 	/* other variants had been implemented for evaluation,
3467 	 * but have been dropped as this one turned out to be "best"
3468 	 * during all our tests. */
3469 
3470 	dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3471 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3472 	return -EIO;
3473 }
3474 
INFO_bm_xfer_stats(struct drbd_conf * mdev,const char * direction,struct bm_xfer_ctx * c)3475 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3476 		const char *direction, struct bm_xfer_ctx *c)
3477 {
3478 	/* what would it take to transfer it "plaintext" */
3479 	unsigned plain = sizeof(struct p_header80) *
3480 		((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3481 		+ c->bm_words * sizeof(long);
3482 	unsigned total = c->bytes[0] + c->bytes[1];
3483 	unsigned r;
3484 
3485 	/* total can not be zero. but just in case: */
3486 	if (total == 0)
3487 		return;
3488 
3489 	/* don't report if not compressed */
3490 	if (total >= plain)
3491 		return;
3492 
3493 	/* total < plain. check for overflow, still */
3494 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3495 		                    : (1000 * total / plain);
3496 
3497 	if (r > 1000)
3498 		r = 1000;
3499 
3500 	r = 1000 - r;
3501 	dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3502 	     "total %u; compression: %u.%u%%\n",
3503 			direction,
3504 			c->bytes[1], c->packets[1],
3505 			c->bytes[0], c->packets[0],
3506 			total, r/10, r % 10);
3507 }
3508 
3509 /* Since we are processing the bitfield from lower addresses to higher,
3510    it does not matter if the process it in 32 bit chunks or 64 bit
3511    chunks as long as it is little endian. (Understand it as byte stream,
3512    beginning with the lowest byte...) If we would use big endian
3513    we would need to process it from the highest address to the lowest,
3514    in order to be agnostic to the 32 vs 64 bits issue.
3515 
3516    returns 0 on failure, 1 if we successfully received it. */
receive_bitmap(struct drbd_conf * mdev,enum drbd_packets cmd,unsigned int data_size)3517 static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3518 {
3519 	struct bm_xfer_ctx c;
3520 	void *buffer;
3521 	int err;
3522 	int ok = false;
3523 	struct p_header80 *h = &mdev->data.rbuf.header.h80;
3524 
3525 	drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3526 	/* you are supposed to send additional out-of-sync information
3527 	 * if you actually set bits during this phase */
3528 
3529 	/* maybe we should use some per thread scratch page,
3530 	 * and allocate that during initial device creation? */
3531 	buffer	 = (unsigned long *) __get_free_page(GFP_NOIO);
3532 	if (!buffer) {
3533 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3534 		goto out;
3535 	}
3536 
3537 	c = (struct bm_xfer_ctx) {
3538 		.bm_bits = drbd_bm_bits(mdev),
3539 		.bm_words = drbd_bm_words(mdev),
3540 	};
3541 
3542 	for(;;) {
3543 		if (cmd == P_BITMAP) {
3544 			err = receive_bitmap_plain(mdev, data_size, buffer, &c);
3545 		} else if (cmd == P_COMPRESSED_BITMAP) {
3546 			/* MAYBE: sanity check that we speak proto >= 90,
3547 			 * and the feature is enabled! */
3548 			struct p_compressed_bm *p;
3549 
3550 			if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3551 				dev_err(DEV, "ReportCBitmap packet too large\n");
3552 				goto out;
3553 			}
3554 			/* use the page buff */
3555 			p = buffer;
3556 			memcpy(p, h, sizeof(*h));
3557 			if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
3558 				goto out;
3559 			if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3560 				dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3561 				goto out;
3562 			}
3563 			err = decode_bitmap_c(mdev, p, &c);
3564 		} else {
3565 			dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3566 			goto out;
3567 		}
3568 
3569 		c.packets[cmd == P_BITMAP]++;
3570 		c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
3571 
3572 		if (err <= 0) {
3573 			if (err < 0)
3574 				goto out;
3575 			break;
3576 		}
3577 		if (!drbd_recv_header(mdev, &cmd, &data_size))
3578 			goto out;
3579 	}
3580 
3581 	INFO_bm_xfer_stats(mdev, "receive", &c);
3582 
3583 	if (mdev->state.conn == C_WF_BITMAP_T) {
3584 		enum drbd_state_rv rv;
3585 
3586 		ok = !drbd_send_bitmap(mdev);
3587 		if (!ok)
3588 			goto out;
3589 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3590 		rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3591 		D_ASSERT(rv == SS_SUCCESS);
3592 	} else if (mdev->state.conn != C_WF_BITMAP_S) {
3593 		/* admin may have requested C_DISCONNECTING,
3594 		 * other threads may have noticed network errors */
3595 		dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3596 		    drbd_conn_str(mdev->state.conn));
3597 	}
3598 
3599 	ok = true;
3600  out:
3601 	drbd_bm_unlock(mdev);
3602 	if (ok && mdev->state.conn == C_WF_BITMAP_S)
3603 		drbd_start_resync(mdev, C_SYNC_SOURCE);
3604 	free_page((unsigned long) buffer);
3605 	return ok;
3606 }
3607 
receive_skip(struct drbd_conf * mdev,enum drbd_packets cmd,unsigned int data_size)3608 static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3609 {
3610 	/* TODO zero copy sink :) */
3611 	static char sink[128];
3612 	int size, want, r;
3613 
3614 	dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3615 		 cmd, data_size);
3616 
3617 	size = data_size;
3618 	while (size > 0) {
3619 		want = min_t(int, size, sizeof(sink));
3620 		r = drbd_recv(mdev, sink, want);
3621 		ERR_IF(r <= 0) break;
3622 		size -= r;
3623 	}
3624 	return size == 0;
3625 }
3626 
receive_UnplugRemote(struct drbd_conf * mdev,enum drbd_packets cmd,unsigned int data_size)3627 static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3628 {
3629 	/* Make sure we've acked all the TCP data associated
3630 	 * with the data requests being unplugged */
3631 	drbd_tcp_quickack(mdev->data.socket);
3632 
3633 	return true;
3634 }
3635 
receive_out_of_sync(struct drbd_conf * mdev,enum drbd_packets cmd,unsigned int data_size)3636 static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3637 {
3638 	struct p_block_desc *p = &mdev->data.rbuf.block_desc;
3639 
3640 	switch (mdev->state.conn) {
3641 	case C_WF_SYNC_UUID:
3642 	case C_WF_BITMAP_T:
3643 	case C_BEHIND:
3644 			break;
3645 	default:
3646 		dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3647 				drbd_conn_str(mdev->state.conn));
3648 	}
3649 
3650 	drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3651 
3652 	return true;
3653 }
3654 
3655 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3656 
3657 struct data_cmd {
3658 	int expect_payload;
3659 	size_t pkt_size;
3660 	drbd_cmd_handler_f function;
3661 };
3662 
3663 static struct data_cmd drbd_cmd_handler[] = {
3664 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
3665 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
3666 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3667 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3668 	[P_BITMAP]	    = { 1, sizeof(struct p_header80), receive_bitmap } ,
3669 	[P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3670 	[P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3671 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
3672 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3673 	[P_SYNC_PARAM]	    = { 1, sizeof(struct p_header80), receive_SyncParam },
3674 	[P_SYNC_PARAM89]    = { 1, sizeof(struct p_header80), receive_SyncParam },
3675 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
3676 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
3677 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
3678 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
3679 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
3680 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3681 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
3682 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
3683 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3684 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
3685 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3686 	/* anything missing from this table is in
3687 	 * the asender_tbl, see get_asender_cmd */
3688 	[P_MAX_CMD]	    = { 0, 0, NULL },
3689 };
3690 
3691 /* All handler functions that expect a sub-header get that sub-heder in
3692    mdev->data.rbuf.header.head.payload.
3693 
3694    Usually in mdev->data.rbuf.header.head the callback can find the usual
3695    p_header, but they may not rely on that. Since there is also p_header95 !
3696  */
3697 
drbdd(struct drbd_conf * mdev)3698 static void drbdd(struct drbd_conf *mdev)
3699 {
3700 	union p_header *header = &mdev->data.rbuf.header;
3701 	unsigned int packet_size;
3702 	enum drbd_packets cmd;
3703 	size_t shs; /* sub header size */
3704 	int rv;
3705 
3706 	while (get_t_state(&mdev->receiver) == Running) {
3707 		drbd_thread_current_set_cpu(mdev);
3708 		if (!drbd_recv_header(mdev, &cmd, &packet_size))
3709 			goto err_out;
3710 
3711 		if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3712 			dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3713 			goto err_out;
3714 		}
3715 
3716 		shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3717 		if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3718 			dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3719 			goto err_out;
3720 		}
3721 
3722 		if (shs) {
3723 			rv = drbd_recv(mdev, &header->h80.payload, shs);
3724 			if (unlikely(rv != shs)) {
3725 				if (!signal_pending(current))
3726 					dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
3727 				goto err_out;
3728 			}
3729 		}
3730 
3731 		rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3732 
3733 		if (unlikely(!rv)) {
3734 			dev_err(DEV, "error receiving %s, l: %d!\n",
3735 			    cmdname(cmd), packet_size);
3736 			goto err_out;
3737 		}
3738 	}
3739 
3740 	if (0) {
3741 	err_out:
3742 		drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3743 	}
3744 	/* If we leave here, we probably want to update at least the
3745 	 * "Connected" indicator on stable storage. Do so explicitly here. */
3746 	drbd_md_sync(mdev);
3747 }
3748 
drbd_flush_workqueue(struct drbd_conf * mdev)3749 void drbd_flush_workqueue(struct drbd_conf *mdev)
3750 {
3751 	struct drbd_wq_barrier barr;
3752 
3753 	barr.w.cb = w_prev_work_done;
3754 	init_completion(&barr.done);
3755 	drbd_queue_work(&mdev->data.work, &barr.w);
3756 	wait_for_completion(&barr.done);
3757 }
3758 
drbd_free_tl_hash(struct drbd_conf * mdev)3759 void drbd_free_tl_hash(struct drbd_conf *mdev)
3760 {
3761 	struct hlist_head *h;
3762 
3763 	spin_lock_irq(&mdev->req_lock);
3764 
3765 	if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3766 		spin_unlock_irq(&mdev->req_lock);
3767 		return;
3768 	}
3769 	/* paranoia code */
3770 	for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3771 		if (h->first)
3772 			dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3773 				(int)(h - mdev->ee_hash), h->first);
3774 	kfree(mdev->ee_hash);
3775 	mdev->ee_hash = NULL;
3776 	mdev->ee_hash_s = 0;
3777 
3778 	/* paranoia code */
3779 	for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3780 		if (h->first)
3781 			dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3782 				(int)(h - mdev->tl_hash), h->first);
3783 	kfree(mdev->tl_hash);
3784 	mdev->tl_hash = NULL;
3785 	mdev->tl_hash_s = 0;
3786 	spin_unlock_irq(&mdev->req_lock);
3787 }
3788 
drbd_disconnect(struct drbd_conf * mdev)3789 static void drbd_disconnect(struct drbd_conf *mdev)
3790 {
3791 	enum drbd_fencing_p fp;
3792 	union drbd_state os, ns;
3793 	int rv = SS_UNKNOWN_ERROR;
3794 	unsigned int i;
3795 
3796 	if (mdev->state.conn == C_STANDALONE)
3797 		return;
3798 
3799 	/* asender does not clean up anything. it must not interfere, either */
3800 	drbd_thread_stop(&mdev->asender);
3801 	drbd_free_sock(mdev);
3802 
3803 	/* wait for current activity to cease. */
3804 	spin_lock_irq(&mdev->req_lock);
3805 	_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3806 	_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3807 	_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3808 	spin_unlock_irq(&mdev->req_lock);
3809 
3810 	/* We do not have data structures that would allow us to
3811 	 * get the rs_pending_cnt down to 0 again.
3812 	 *  * On C_SYNC_TARGET we do not have any data structures describing
3813 	 *    the pending RSDataRequest's we have sent.
3814 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
3815 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3816 	 *  And no, it is not the sum of the reference counts in the
3817 	 *  resync_LRU. The resync_LRU tracks the whole operation including
3818 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3819 	 *  on the fly. */
3820 	drbd_rs_cancel_all(mdev);
3821 	mdev->rs_total = 0;
3822 	mdev->rs_failed = 0;
3823 	atomic_set(&mdev->rs_pending_cnt, 0);
3824 	wake_up(&mdev->misc_wait);
3825 
3826 	del_timer(&mdev->request_timer);
3827 
3828 	/* make sure syncer is stopped and w_resume_next_sg queued */
3829 	del_timer_sync(&mdev->resync_timer);
3830 	resync_timer_fn((unsigned long)mdev);
3831 
3832 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3833 	 * w_make_resync_request etc. which may still be on the worker queue
3834 	 * to be "canceled" */
3835 	drbd_flush_workqueue(mdev);
3836 
3837 	/* This also does reclaim_net_ee().  If we do this too early, we might
3838 	 * miss some resync ee and pages.*/
3839 	drbd_process_done_ee(mdev);
3840 
3841 	kfree(mdev->p_uuid);
3842 	mdev->p_uuid = NULL;
3843 
3844 	if (!is_susp(mdev->state))
3845 		tl_clear(mdev);
3846 
3847 	dev_info(DEV, "Connection closed\n");
3848 
3849 	drbd_md_sync(mdev);
3850 
3851 	fp = FP_DONT_CARE;
3852 	if (get_ldev(mdev)) {
3853 		fp = mdev->ldev->dc.fencing;
3854 		put_ldev(mdev);
3855 	}
3856 
3857 	if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3858 		drbd_try_outdate_peer_async(mdev);
3859 
3860 	spin_lock_irq(&mdev->req_lock);
3861 	os = mdev->state;
3862 	if (os.conn >= C_UNCONNECTED) {
3863 		/* Do not restart in case we are C_DISCONNECTING */
3864 		ns = os;
3865 		ns.conn = C_UNCONNECTED;
3866 		rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3867 	}
3868 	spin_unlock_irq(&mdev->req_lock);
3869 
3870 	if (os.conn == C_DISCONNECTING) {
3871 		wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
3872 
3873 		crypto_free_hash(mdev->cram_hmac_tfm);
3874 		mdev->cram_hmac_tfm = NULL;
3875 
3876 		kfree(mdev->net_conf);
3877 		mdev->net_conf = NULL;
3878 		drbd_request_state(mdev, NS(conn, C_STANDALONE));
3879 	}
3880 
3881 	/* serialize with bitmap writeout triggered by the state change,
3882 	 * if any. */
3883 	wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3884 
3885 	/* tcp_close and release of sendpage pages can be deferred.  I don't
3886 	 * want to use SO_LINGER, because apparently it can be deferred for
3887 	 * more than 20 seconds (longest time I checked).
3888 	 *
3889 	 * Actually we don't care for exactly when the network stack does its
3890 	 * put_page(), but release our reference on these pages right here.
3891 	 */
3892 	i = drbd_release_ee(mdev, &mdev->net_ee);
3893 	if (i)
3894 		dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3895 	i = atomic_read(&mdev->pp_in_use_by_net);
3896 	if (i)
3897 		dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3898 	i = atomic_read(&mdev->pp_in_use);
3899 	if (i)
3900 		dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3901 
3902 	D_ASSERT(list_empty(&mdev->read_ee));
3903 	D_ASSERT(list_empty(&mdev->active_ee));
3904 	D_ASSERT(list_empty(&mdev->sync_ee));
3905 	D_ASSERT(list_empty(&mdev->done_ee));
3906 
3907 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3908 	atomic_set(&mdev->current_epoch->epoch_size, 0);
3909 	D_ASSERT(list_empty(&mdev->current_epoch->list));
3910 }
3911 
3912 /*
3913  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3914  * we can agree on is stored in agreed_pro_version.
3915  *
3916  * feature flags and the reserved array should be enough room for future
3917  * enhancements of the handshake protocol, and possible plugins...
3918  *
3919  * for now, they are expected to be zero, but ignored.
3920  */
drbd_send_handshake(struct drbd_conf * mdev)3921 static int drbd_send_handshake(struct drbd_conf *mdev)
3922 {
3923 	/* ASSERT current == mdev->receiver ... */
3924 	struct p_handshake *p = &mdev->data.sbuf.handshake;
3925 	int ok;
3926 
3927 	if (mutex_lock_interruptible(&mdev->data.mutex)) {
3928 		dev_err(DEV, "interrupted during initial handshake\n");
3929 		return 0; /* interrupted. not ok. */
3930 	}
3931 
3932 	if (mdev->data.socket == NULL) {
3933 		mutex_unlock(&mdev->data.mutex);
3934 		return 0;
3935 	}
3936 
3937 	memset(p, 0, sizeof(*p));
3938 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3939 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3940 	ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3941 			     (struct p_header80 *)p, sizeof(*p), 0 );
3942 	mutex_unlock(&mdev->data.mutex);
3943 	return ok;
3944 }
3945 
3946 /*
3947  * return values:
3948  *   1 yes, we have a valid connection
3949  *   0 oops, did not work out, please try again
3950  *  -1 peer talks different language,
3951  *     no point in trying again, please go standalone.
3952  */
drbd_do_handshake(struct drbd_conf * mdev)3953 static int drbd_do_handshake(struct drbd_conf *mdev)
3954 {
3955 	/* ASSERT current == mdev->receiver ... */
3956 	struct p_handshake *p = &mdev->data.rbuf.handshake;
3957 	const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
3958 	unsigned int length;
3959 	enum drbd_packets cmd;
3960 	int rv;
3961 
3962 	rv = drbd_send_handshake(mdev);
3963 	if (!rv)
3964 		return 0;
3965 
3966 	rv = drbd_recv_header(mdev, &cmd, &length);
3967 	if (!rv)
3968 		return 0;
3969 
3970 	if (cmd != P_HAND_SHAKE) {
3971 		dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3972 		     cmdname(cmd), cmd);
3973 		return -1;
3974 	}
3975 
3976 	if (length != expect) {
3977 		dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3978 		     expect, length);
3979 		return -1;
3980 	}
3981 
3982 	rv = drbd_recv(mdev, &p->head.payload, expect);
3983 
3984 	if (rv != expect) {
3985 		if (!signal_pending(current))
3986 			dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
3987 		return 0;
3988 	}
3989 
3990 	p->protocol_min = be32_to_cpu(p->protocol_min);
3991 	p->protocol_max = be32_to_cpu(p->protocol_max);
3992 	if (p->protocol_max == 0)
3993 		p->protocol_max = p->protocol_min;
3994 
3995 	if (PRO_VERSION_MAX < p->protocol_min ||
3996 	    PRO_VERSION_MIN > p->protocol_max)
3997 		goto incompat;
3998 
3999 	mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4000 
4001 	dev_info(DEV, "Handshake successful: "
4002 	     "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4003 
4004 	return 1;
4005 
4006  incompat:
4007 	dev_err(DEV, "incompatible DRBD dialects: "
4008 	    "I support %d-%d, peer supports %d-%d\n",
4009 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
4010 	    p->protocol_min, p->protocol_max);
4011 	return -1;
4012 }
4013 
4014 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
drbd_do_auth(struct drbd_conf * mdev)4015 static int drbd_do_auth(struct drbd_conf *mdev)
4016 {
4017 	dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4018 	dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4019 	return -1;
4020 }
4021 #else
4022 #define CHALLENGE_LEN 64
4023 
4024 /* Return value:
4025 	1 - auth succeeded,
4026 	0 - failed, try again (network error),
4027 	-1 - auth failed, don't try again.
4028 */
4029 
drbd_do_auth(struct drbd_conf * mdev)4030 static int drbd_do_auth(struct drbd_conf *mdev)
4031 {
4032 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4033 	struct scatterlist sg;
4034 	char *response = NULL;
4035 	char *right_response = NULL;
4036 	char *peers_ch = NULL;
4037 	unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4038 	unsigned int resp_size;
4039 	struct hash_desc desc;
4040 	enum drbd_packets cmd;
4041 	unsigned int length;
4042 	int rv;
4043 
4044 	desc.tfm = mdev->cram_hmac_tfm;
4045 	desc.flags = 0;
4046 
4047 	rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4048 				(u8 *)mdev->net_conf->shared_secret, key_len);
4049 	if (rv) {
4050 		dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4051 		rv = -1;
4052 		goto fail;
4053 	}
4054 
4055 	get_random_bytes(my_challenge, CHALLENGE_LEN);
4056 
4057 	rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4058 	if (!rv)
4059 		goto fail;
4060 
4061 	rv = drbd_recv_header(mdev, &cmd, &length);
4062 	if (!rv)
4063 		goto fail;
4064 
4065 	if (cmd != P_AUTH_CHALLENGE) {
4066 		dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4067 		    cmdname(cmd), cmd);
4068 		rv = 0;
4069 		goto fail;
4070 	}
4071 
4072 	if (length > CHALLENGE_LEN * 2) {
4073 		dev_err(DEV, "expected AuthChallenge payload too big.\n");
4074 		rv = -1;
4075 		goto fail;
4076 	}
4077 
4078 	peers_ch = kmalloc(length, GFP_NOIO);
4079 	if (peers_ch == NULL) {
4080 		dev_err(DEV, "kmalloc of peers_ch failed\n");
4081 		rv = -1;
4082 		goto fail;
4083 	}
4084 
4085 	rv = drbd_recv(mdev, peers_ch, length);
4086 
4087 	if (rv != length) {
4088 		if (!signal_pending(current))
4089 			dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
4090 		rv = 0;
4091 		goto fail;
4092 	}
4093 
4094 	resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4095 	response = kmalloc(resp_size, GFP_NOIO);
4096 	if (response == NULL) {
4097 		dev_err(DEV, "kmalloc of response failed\n");
4098 		rv = -1;
4099 		goto fail;
4100 	}
4101 
4102 	sg_init_table(&sg, 1);
4103 	sg_set_buf(&sg, peers_ch, length);
4104 
4105 	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4106 	if (rv) {
4107 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4108 		rv = -1;
4109 		goto fail;
4110 	}
4111 
4112 	rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4113 	if (!rv)
4114 		goto fail;
4115 
4116 	rv = drbd_recv_header(mdev, &cmd, &length);
4117 	if (!rv)
4118 		goto fail;
4119 
4120 	if (cmd != P_AUTH_RESPONSE) {
4121 		dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4122 			cmdname(cmd), cmd);
4123 		rv = 0;
4124 		goto fail;
4125 	}
4126 
4127 	if (length != resp_size) {
4128 		dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4129 		rv = 0;
4130 		goto fail;
4131 	}
4132 
4133 	rv = drbd_recv(mdev, response , resp_size);
4134 
4135 	if (rv != resp_size) {
4136 		if (!signal_pending(current))
4137 			dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4138 		rv = 0;
4139 		goto fail;
4140 	}
4141 
4142 	right_response = kmalloc(resp_size, GFP_NOIO);
4143 	if (right_response == NULL) {
4144 		dev_err(DEV, "kmalloc of right_response failed\n");
4145 		rv = -1;
4146 		goto fail;
4147 	}
4148 
4149 	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4150 
4151 	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4152 	if (rv) {
4153 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4154 		rv = -1;
4155 		goto fail;
4156 	}
4157 
4158 	rv = !memcmp(response, right_response, resp_size);
4159 
4160 	if (rv)
4161 		dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4162 		     resp_size, mdev->net_conf->cram_hmac_alg);
4163 	else
4164 		rv = -1;
4165 
4166  fail:
4167 	kfree(peers_ch);
4168 	kfree(response);
4169 	kfree(right_response);
4170 
4171 	return rv;
4172 }
4173 #endif
4174 
drbdd_init(struct drbd_thread * thi)4175 int drbdd_init(struct drbd_thread *thi)
4176 {
4177 	struct drbd_conf *mdev = thi->mdev;
4178 	unsigned int minor = mdev_to_minor(mdev);
4179 	int h;
4180 
4181 	sprintf(current->comm, "drbd%d_receiver", minor);
4182 
4183 	dev_info(DEV, "receiver (re)started\n");
4184 
4185 	do {
4186 		h = drbd_connect(mdev);
4187 		if (h == 0) {
4188 			drbd_disconnect(mdev);
4189 			schedule_timeout_interruptible(HZ);
4190 		}
4191 		if (h == -1) {
4192 			dev_warn(DEV, "Discarding network configuration.\n");
4193 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4194 		}
4195 	} while (h == 0);
4196 
4197 	if (h > 0) {
4198 		if (get_net_conf(mdev)) {
4199 			drbdd(mdev);
4200 			put_net_conf(mdev);
4201 		}
4202 	}
4203 
4204 	drbd_disconnect(mdev);
4205 
4206 	dev_info(DEV, "receiver terminated\n");
4207 	return 0;
4208 }
4209 
4210 /* ********* acknowledge sender ******** */
4211 
got_RqSReply(struct drbd_conf * mdev,struct p_header80 * h)4212 static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
4213 {
4214 	struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4215 
4216 	int retcode = be32_to_cpu(p->retcode);
4217 
4218 	if (retcode >= SS_SUCCESS) {
4219 		set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4220 	} else {
4221 		set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4222 		dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4223 		    drbd_set_st_err_str(retcode), retcode);
4224 	}
4225 	wake_up(&mdev->state_wait);
4226 
4227 	return true;
4228 }
4229 
got_Ping(struct drbd_conf * mdev,struct p_header80 * h)4230 static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
4231 {
4232 	return drbd_send_ping_ack(mdev);
4233 
4234 }
4235 
got_PingAck(struct drbd_conf * mdev,struct p_header80 * h)4236 static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
4237 {
4238 	/* restore idle timeout */
4239 	mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4240 	if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4241 		wake_up(&mdev->misc_wait);
4242 
4243 	return true;
4244 }
4245 
got_IsInSync(struct drbd_conf * mdev,struct p_header80 * h)4246 static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
4247 {
4248 	struct p_block_ack *p = (struct p_block_ack *)h;
4249 	sector_t sector = be64_to_cpu(p->sector);
4250 	int blksize = be32_to_cpu(p->blksize);
4251 
4252 	D_ASSERT(mdev->agreed_pro_version >= 89);
4253 
4254 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4255 
4256 	if (get_ldev(mdev)) {
4257 		drbd_rs_complete_io(mdev, sector);
4258 		drbd_set_in_sync(mdev, sector, blksize);
4259 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4260 		mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4261 		put_ldev(mdev);
4262 	}
4263 	dec_rs_pending(mdev);
4264 	atomic_add(blksize >> 9, &mdev->rs_sect_in);
4265 
4266 	return true;
4267 }
4268 
4269 /* when we receive the ACK for a write request,
4270  * verify that we actually know about it */
_ack_id_to_req(struct drbd_conf * mdev,u64 id,sector_t sector)4271 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4272 	u64 id, sector_t sector)
4273 {
4274 	struct hlist_head *slot = tl_hash_slot(mdev, sector);
4275 	struct hlist_node *n;
4276 	struct drbd_request *req;
4277 
4278 	hlist_for_each_entry(req, n, slot, colision) {
4279 		if ((unsigned long)req == (unsigned long)id) {
4280 			if (req->sector != sector) {
4281 				dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4282 				    "wrong sector (%llus versus %llus)\n", req,
4283 				    (unsigned long long)req->sector,
4284 				    (unsigned long long)sector);
4285 				break;
4286 			}
4287 			return req;
4288 		}
4289 	}
4290 	return NULL;
4291 }
4292 
4293 typedef struct drbd_request *(req_validator_fn)
4294 	(struct drbd_conf *mdev, u64 id, sector_t sector);
4295 
validate_req_change_req_state(struct drbd_conf * mdev,u64 id,sector_t sector,req_validator_fn validator,const char * func,enum drbd_req_event what)4296 static int validate_req_change_req_state(struct drbd_conf *mdev,
4297 	u64 id, sector_t sector, req_validator_fn validator,
4298 	const char *func, enum drbd_req_event what)
4299 {
4300 	struct drbd_request *req;
4301 	struct bio_and_error m;
4302 
4303 	spin_lock_irq(&mdev->req_lock);
4304 	req = validator(mdev, id, sector);
4305 	if (unlikely(!req)) {
4306 		spin_unlock_irq(&mdev->req_lock);
4307 
4308 		dev_err(DEV, "%s: failed to find req %p, sector %llus\n", func,
4309 			(void *)(unsigned long)id, (unsigned long long)sector);
4310 		return false;
4311 	}
4312 	__req_mod(req, what, &m);
4313 	spin_unlock_irq(&mdev->req_lock);
4314 
4315 	if (m.bio)
4316 		complete_master_bio(mdev, &m);
4317 	return true;
4318 }
4319 
got_BlockAck(struct drbd_conf * mdev,struct p_header80 * h)4320 static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
4321 {
4322 	struct p_block_ack *p = (struct p_block_ack *)h;
4323 	sector_t sector = be64_to_cpu(p->sector);
4324 	int blksize = be32_to_cpu(p->blksize);
4325 	enum drbd_req_event what;
4326 
4327 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4328 
4329 	if (is_syncer_block_id(p->block_id)) {
4330 		drbd_set_in_sync(mdev, sector, blksize);
4331 		dec_rs_pending(mdev);
4332 		return true;
4333 	}
4334 	switch (be16_to_cpu(h->command)) {
4335 	case P_RS_WRITE_ACK:
4336 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4337 		what = write_acked_by_peer_and_sis;
4338 		break;
4339 	case P_WRITE_ACK:
4340 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4341 		what = write_acked_by_peer;
4342 		break;
4343 	case P_RECV_ACK:
4344 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4345 		what = recv_acked_by_peer;
4346 		break;
4347 	case P_DISCARD_ACK:
4348 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4349 		what = conflict_discarded_by_peer;
4350 		break;
4351 	default:
4352 		D_ASSERT(0);
4353 		return false;
4354 	}
4355 
4356 	return validate_req_change_req_state(mdev, p->block_id, sector,
4357 		_ack_id_to_req, __func__ , what);
4358 }
4359 
got_NegAck(struct drbd_conf * mdev,struct p_header80 * h)4360 static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
4361 {
4362 	struct p_block_ack *p = (struct p_block_ack *)h;
4363 	sector_t sector = be64_to_cpu(p->sector);
4364 	int size = be32_to_cpu(p->blksize);
4365 	struct drbd_request *req;
4366 	struct bio_and_error m;
4367 
4368 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4369 
4370 	if (is_syncer_block_id(p->block_id)) {
4371 		dec_rs_pending(mdev);
4372 		drbd_rs_failed_io(mdev, sector, size);
4373 		return true;
4374 	}
4375 
4376 	spin_lock_irq(&mdev->req_lock);
4377 	req = _ack_id_to_req(mdev, p->block_id, sector);
4378 	if (!req) {
4379 		spin_unlock_irq(&mdev->req_lock);
4380 		if (mdev->net_conf->wire_protocol == DRBD_PROT_A ||
4381 		    mdev->net_conf->wire_protocol == DRBD_PROT_B) {
4382 			/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4383 			   The master bio might already be completed, therefore the
4384 			   request is no longer in the collision hash.
4385 			   => Do not try to validate block_id as request. */
4386 			/* In Protocol B we might already have got a P_RECV_ACK
4387 			   but then get a P_NEG_ACK after wards. */
4388 			drbd_set_out_of_sync(mdev, sector, size);
4389 			return true;
4390 		} else {
4391 			dev_err(DEV, "%s: failed to find req %p, sector %llus\n", __func__,
4392 				(void *)(unsigned long)p->block_id, (unsigned long long)sector);
4393 			return false;
4394 		}
4395 	}
4396 	__req_mod(req, neg_acked, &m);
4397 	spin_unlock_irq(&mdev->req_lock);
4398 
4399 	if (m.bio)
4400 		complete_master_bio(mdev, &m);
4401 	return true;
4402 }
4403 
got_NegDReply(struct drbd_conf * mdev,struct p_header80 * h)4404 static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
4405 {
4406 	struct p_block_ack *p = (struct p_block_ack *)h;
4407 	sector_t sector = be64_to_cpu(p->sector);
4408 
4409 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4410 	dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4411 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
4412 
4413 	return validate_req_change_req_state(mdev, p->block_id, sector,
4414 		_ar_id_to_req, __func__ , neg_acked);
4415 }
4416 
got_NegRSDReply(struct drbd_conf * mdev,struct p_header80 * h)4417 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
4418 {
4419 	sector_t sector;
4420 	int size;
4421 	struct p_block_ack *p = (struct p_block_ack *)h;
4422 
4423 	sector = be64_to_cpu(p->sector);
4424 	size = be32_to_cpu(p->blksize);
4425 
4426 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4427 
4428 	dec_rs_pending(mdev);
4429 
4430 	if (get_ldev_if_state(mdev, D_FAILED)) {
4431 		drbd_rs_complete_io(mdev, sector);
4432 		switch (be16_to_cpu(h->command)) {
4433 		case P_NEG_RS_DREPLY:
4434 			drbd_rs_failed_io(mdev, sector, size);
4435 		case P_RS_CANCEL:
4436 			break;
4437 		default:
4438 			D_ASSERT(0);
4439 			put_ldev(mdev);
4440 			return false;
4441 		}
4442 		put_ldev(mdev);
4443 	}
4444 
4445 	return true;
4446 }
4447 
got_BarrierAck(struct drbd_conf * mdev,struct p_header80 * h)4448 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
4449 {
4450 	struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4451 
4452 	tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4453 
4454 	if (mdev->state.conn == C_AHEAD &&
4455 	    atomic_read(&mdev->ap_in_flight) == 0 &&
4456 	    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4457 		mdev->start_resync_timer.expires = jiffies + HZ;
4458 		add_timer(&mdev->start_resync_timer);
4459 	}
4460 
4461 	return true;
4462 }
4463 
got_OVResult(struct drbd_conf * mdev,struct p_header80 * h)4464 static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
4465 {
4466 	struct p_block_ack *p = (struct p_block_ack *)h;
4467 	struct drbd_work *w;
4468 	sector_t sector;
4469 	int size;
4470 
4471 	sector = be64_to_cpu(p->sector);
4472 	size = be32_to_cpu(p->blksize);
4473 
4474 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4475 
4476 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4477 		drbd_ov_oos_found(mdev, sector, size);
4478 	else
4479 		ov_oos_print(mdev);
4480 
4481 	if (!get_ldev(mdev))
4482 		return true;
4483 
4484 	drbd_rs_complete_io(mdev, sector);
4485 	dec_rs_pending(mdev);
4486 
4487 	--mdev->ov_left;
4488 
4489 	/* let's advance progress step marks only for every other megabyte */
4490 	if ((mdev->ov_left & 0x200) == 0x200)
4491 		drbd_advance_rs_marks(mdev, mdev->ov_left);
4492 
4493 	if (mdev->ov_left == 0) {
4494 		w = kmalloc(sizeof(*w), GFP_NOIO);
4495 		if (w) {
4496 			w->cb = w_ov_finished;
4497 			drbd_queue_work_front(&mdev->data.work, w);
4498 		} else {
4499 			dev_err(DEV, "kmalloc(w) failed.");
4500 			ov_oos_print(mdev);
4501 			drbd_resync_finished(mdev);
4502 		}
4503 	}
4504 	put_ldev(mdev);
4505 	return true;
4506 }
4507 
got_skip(struct drbd_conf * mdev,struct p_header80 * h)4508 static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
4509 {
4510 	return true;
4511 }
4512 
4513 struct asender_cmd {
4514 	size_t pkt_size;
4515 	int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
4516 };
4517 
get_asender_cmd(int cmd)4518 static struct asender_cmd *get_asender_cmd(int cmd)
4519 {
4520 	static struct asender_cmd asender_tbl[] = {
4521 		/* anything missing from this table is in
4522 		 * the drbd_cmd_handler (drbd_default_handler) table,
4523 		 * see the beginning of drbdd() */
4524 	[P_PING]	    = { sizeof(struct p_header80), got_Ping },
4525 	[P_PING_ACK]	    = { sizeof(struct p_header80), got_PingAck },
4526 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4527 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4528 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4529 	[P_DISCARD_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4530 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
4531 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
4532 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4533 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
4534 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
4535 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4536 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4537 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4538 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply},
4539 	[P_MAX_CMD]	    = { 0, NULL },
4540 	};
4541 	if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4542 		return NULL;
4543 	return &asender_tbl[cmd];
4544 }
4545 
drbd_asender(struct drbd_thread * thi)4546 int drbd_asender(struct drbd_thread *thi)
4547 {
4548 	struct drbd_conf *mdev = thi->mdev;
4549 	struct p_header80 *h = &mdev->meta.rbuf.header.h80;
4550 	struct asender_cmd *cmd = NULL;
4551 
4552 	int rv, len;
4553 	void *buf    = h;
4554 	int received = 0;
4555 	int expect   = sizeof(struct p_header80);
4556 	int empty;
4557 
4558 	sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4559 
4560 	current->policy = SCHED_RR;  /* Make this a realtime task! */
4561 	current->rt_priority = 2;    /* more important than all other tasks */
4562 
4563 	while (get_t_state(thi) == Running) {
4564 		drbd_thread_current_set_cpu(mdev);
4565 		if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4566 			ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4567 			mdev->meta.socket->sk->sk_rcvtimeo =
4568 				mdev->net_conf->ping_timeo*HZ/10;
4569 		}
4570 
4571 		/* conditionally cork;
4572 		 * it may hurt latency if we cork without much to send */
4573 		if (!mdev->net_conf->no_cork &&
4574 			3 < atomic_read(&mdev->unacked_cnt))
4575 			drbd_tcp_cork(mdev->meta.socket);
4576 		while (1) {
4577 			clear_bit(SIGNAL_ASENDER, &mdev->flags);
4578 			flush_signals(current);
4579 			if (!drbd_process_done_ee(mdev))
4580 				goto reconnect;
4581 			/* to avoid race with newly queued ACKs */
4582 			set_bit(SIGNAL_ASENDER, &mdev->flags);
4583 			spin_lock_irq(&mdev->req_lock);
4584 			empty = list_empty(&mdev->done_ee);
4585 			spin_unlock_irq(&mdev->req_lock);
4586 			/* new ack may have been queued right here,
4587 			 * but then there is also a signal pending,
4588 			 * and we start over... */
4589 			if (empty)
4590 				break;
4591 		}
4592 		/* but unconditionally uncork unless disabled */
4593 		if (!mdev->net_conf->no_cork)
4594 			drbd_tcp_uncork(mdev->meta.socket);
4595 
4596 		/* short circuit, recv_msg would return EINTR anyways. */
4597 		if (signal_pending(current))
4598 			continue;
4599 
4600 		rv = drbd_recv_short(mdev, mdev->meta.socket,
4601 				     buf, expect-received, 0);
4602 		clear_bit(SIGNAL_ASENDER, &mdev->flags);
4603 
4604 		flush_signals(current);
4605 
4606 		/* Note:
4607 		 * -EINTR	 (on meta) we got a signal
4608 		 * -EAGAIN	 (on meta) rcvtimeo expired
4609 		 * -ECONNRESET	 other side closed the connection
4610 		 * -ERESTARTSYS  (on data) we got a signal
4611 		 * rv <  0	 other than above: unexpected error!
4612 		 * rv == expected: full header or command
4613 		 * rv <  expected: "woken" by signal during receive
4614 		 * rv == 0	 : "connection shut down by peer"
4615 		 */
4616 		if (likely(rv > 0)) {
4617 			received += rv;
4618 			buf	 += rv;
4619 		} else if (rv == 0) {
4620 			dev_err(DEV, "meta connection shut down by peer.\n");
4621 			goto reconnect;
4622 		} else if (rv == -EAGAIN) {
4623 			if (mdev->meta.socket->sk->sk_rcvtimeo ==
4624 			    mdev->net_conf->ping_timeo*HZ/10) {
4625 				dev_err(DEV, "PingAck did not arrive in time.\n");
4626 				goto reconnect;
4627 			}
4628 			set_bit(SEND_PING, &mdev->flags);
4629 			continue;
4630 		} else if (rv == -EINTR) {
4631 			continue;
4632 		} else {
4633 			dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4634 			goto reconnect;
4635 		}
4636 
4637 		if (received == expect && cmd == NULL) {
4638 			if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4639 				dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4640 				    be32_to_cpu(h->magic),
4641 				    be16_to_cpu(h->command),
4642 				    be16_to_cpu(h->length));
4643 				goto reconnect;
4644 			}
4645 			cmd = get_asender_cmd(be16_to_cpu(h->command));
4646 			len = be16_to_cpu(h->length);
4647 			if (unlikely(cmd == NULL)) {
4648 				dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4649 				    be32_to_cpu(h->magic),
4650 				    be16_to_cpu(h->command),
4651 				    be16_to_cpu(h->length));
4652 				goto disconnect;
4653 			}
4654 			expect = cmd->pkt_size;
4655 			ERR_IF(len != expect-sizeof(struct p_header80))
4656 				goto reconnect;
4657 		}
4658 		if (received == expect) {
4659 			D_ASSERT(cmd != NULL);
4660 			if (!cmd->process(mdev, h))
4661 				goto reconnect;
4662 
4663 			buf	 = h;
4664 			received = 0;
4665 			expect	 = sizeof(struct p_header80);
4666 			cmd	 = NULL;
4667 		}
4668 	}
4669 
4670 	if (0) {
4671 reconnect:
4672 		drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4673 		drbd_md_sync(mdev);
4674 	}
4675 	if (0) {
4676 disconnect:
4677 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4678 		drbd_md_sync(mdev);
4679 	}
4680 	clear_bit(SIGNAL_ASENDER, &mdev->flags);
4681 
4682 	D_ASSERT(mdev->state.conn < C_CONNECTED);
4683 	dev_info(DEV, "asender terminated\n");
4684 
4685 	return 0;
4686 }
4687