1 /*
2  *  linux/net/sunrpc/xprt.c
3  *
4  *  This is a generic RPC call interface supporting congestion avoidance,
5  *  and asynchronous calls.
6  *
7  *  The interface works like this:
8  *
9  *  -	When a process places a call, it allocates a request slot if
10  *	one is available. Otherwise, it sleeps on the backlog queue
11  *	(xprt_reserve).
12  *  -	Next, the caller puts together the RPC message, stuffs it into
13  *	the request struct, and calls xprt_call().
14  *  -	xprt_call transmits the message and installs the caller on the
15  *	socket's wait list. At the same time, it installs a timer that
16  *	is run after the packet's timeout has expired.
17  *  -	When a packet arrives, the data_ready handler walks the list of
18  *	pending requests for that socket. If a matching XID is found, the
19  *	caller is woken up, and the timer removed.
20  *  -	When no reply arrives within the timeout interval, the timer is
21  *	fired by the kernel and runs xprt_timer(). It either adjusts the
22  *	timeout values (minor timeout) or wakes up the caller with a status
23  *	of -ETIMEDOUT.
24  *  -	When the caller receives a notification from RPC that a reply arrived,
25  *	it should release the RPC slot, and process the reply.
26  *	If the call timed out, it may choose to retry the operation by
27  *	adjusting the initial timeout value, and simply calling rpc_call
28  *	again.
29  *
30  *  Support for async RPC is done through a set of RPC-specific scheduling
31  *  primitives that `transparently' work for processes as well as async
32  *  tasks that rely on callbacks.
33  *
34  *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
35  *
36  *  TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37  *  TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38  *  TCP NFS related read + write fixes
39  *   (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
40  *
41  *  Rewrite of larges part of the code in order to stabilize TCP stuff.
42  *  Fix behaviour when socket buffer is full.
43  *   (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
44  */
45 
46 #define __KERNEL_SYSCALLS__
47 
48 #include <linux/version.h>
49 #include <linux/types.h>
50 #include <linux/slab.h>
51 #include <linux/capability.h>
52 #include <linux/sched.h>
53 #include <linux/errno.h>
54 #include <linux/socket.h>
55 #include <linux/in.h>
56 #include <linux/net.h>
57 #include <linux/mm.h>
58 #include <linux/udp.h>
59 #include <linux/unistd.h>
60 #include <linux/sunrpc/clnt.h>
61 #include <linux/file.h>
62 
63 #include <net/sock.h>
64 #include <net/checksum.h>
65 #include <net/udp.h>
66 #include <net/tcp.h>
67 
68 #include <asm/uaccess.h>
69 
70 /*
71  * Local variables
72  */
73 
74 #ifdef RPC_DEBUG
75 # undef  RPC_DEBUG_DATA
76 # define RPCDBG_FACILITY	RPCDBG_XPRT
77 #endif
78 
79 #define XPRT_MAX_BACKOFF	(8)
80 
81 /*
82  * Local functions
83  */
84 static void	xprt_request_init(struct rpc_task *, struct rpc_xprt *);
85 static void	do_xprt_transmit(struct rpc_task *);
86 static inline void	do_xprt_reserve(struct rpc_task *);
87 static void	xprt_disconnect(struct rpc_xprt *);
88 static void	xprt_connect_status(struct rpc_task *task);
89 static struct socket *xprt_create_socket(int, struct rpc_timeout *, int);
90 static int	xprt_bind_socket(struct rpc_xprt *, struct socket *);
91 static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
92 
93 #ifdef RPC_DEBUG_DATA
94 /*
95  * Print the buffer contents (first 128 bytes only--just enough for
96  * diropres return).
97  */
98 static void
xprt_pktdump(char * msg,u32 * packet,unsigned int count)99 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
100 {
101 	u8	*buf = (u8 *) packet;
102 	int	j;
103 
104 	dprintk("RPC:      %s\n", msg);
105 	for (j = 0; j < count && j < 128; j += 4) {
106 		if (!(j & 31)) {
107 			if (j)
108 				dprintk("\n");
109 			dprintk("0x%04x ", j);
110 		}
111 		dprintk("%02x%02x%02x%02x ",
112 			buf[j], buf[j+1], buf[j+2], buf[j+3]);
113 	}
114 	dprintk("\n");
115 }
116 #else
117 static inline void
xprt_pktdump(char * msg,u32 * packet,unsigned int count)118 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
119 {
120 	/* NOP */
121 }
122 #endif
123 
124 /*
125  * Look up RPC transport given an INET socket
126  */
127 static inline struct rpc_xprt *
xprt_from_sock(struct sock * sk)128 xprt_from_sock(struct sock *sk)
129 {
130 	return (struct rpc_xprt *) sk->user_data;
131 }
132 
133 /*
134  * Serialize write access to sockets, in order to prevent different
135  * requests from interfering with each other.
136  * Also prevents TCP socket connections from colliding with writes.
137  */
138 static int
__xprt_lock_write(struct rpc_xprt * xprt,struct rpc_task * task)139 __xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
140 {
141 	struct rpc_rqst *req = task->tk_rqstp;
142 	if (!xprt->snd_task) {
143 		if (xprt->nocong || __xprt_get_cong(xprt, task)) {
144 			xprt->snd_task = task;
145 			if (req) {
146 				req->rq_bytes_sent = 0;
147 				req->rq_ntrans++;
148 			}
149 		}
150 	}
151 	if (xprt->snd_task != task) {
152 		dprintk("RPC: %4d TCP write queue full\n", task->tk_pid);
153 		task->tk_timeout = 0;
154 		task->tk_status = -EAGAIN;
155 		if (req && req->rq_ntrans)
156 			rpc_sleep_on(&xprt->resend, task, NULL, NULL);
157 		else
158 			rpc_sleep_on(&xprt->sending, task, NULL, NULL);
159 	}
160 	return xprt->snd_task == task;
161 }
162 
163 static inline int
xprt_lock_write(struct rpc_xprt * xprt,struct rpc_task * task)164 xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
165 {
166 	int retval;
167 	spin_lock_bh(&xprt->sock_lock);
168 	retval = __xprt_lock_write(xprt, task);
169 	spin_unlock_bh(&xprt->sock_lock);
170 	return retval;
171 }
172 
173 static void
__xprt_lock_write_next(struct rpc_xprt * xprt)174 __xprt_lock_write_next(struct rpc_xprt *xprt)
175 {
176 	struct rpc_task *task;
177 
178 	if (xprt->snd_task)
179 		return;
180 	task = rpc_wake_up_next(&xprt->resend);
181 	if (!task) {
182 		if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
183 			return;
184 		task = rpc_wake_up_next(&xprt->sending);
185 		if (!task)
186 			return;
187 	}
188 	if (xprt->nocong || __xprt_get_cong(xprt, task)) {
189 		struct rpc_rqst *req = task->tk_rqstp;
190 		xprt->snd_task = task;
191 		if (req) {
192 			req->rq_bytes_sent = 0;
193 			req->rq_ntrans++;
194 		}
195 	}
196 }
197 
198 /*
199  * Releases the socket for use by other requests.
200  */
201 static void
__xprt_release_write(struct rpc_xprt * xprt,struct rpc_task * task)202 __xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
203 {
204 	if (xprt->snd_task == task)
205 		xprt->snd_task = NULL;
206 	__xprt_lock_write_next(xprt);
207 }
208 
209 static inline void
xprt_release_write(struct rpc_xprt * xprt,struct rpc_task * task)210 xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
211 {
212 	spin_lock_bh(&xprt->sock_lock);
213 	__xprt_release_write(xprt, task);
214 	spin_unlock_bh(&xprt->sock_lock);
215 }
216 
217 /*
218  * Write data to socket.
219  */
220 static inline int
xprt_sendmsg(struct rpc_xprt * xprt,struct rpc_rqst * req)221 xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
222 {
223 	struct socket	*sock = xprt->sock;
224 	struct msghdr	msg;
225 	struct xdr_buf	*xdr = &req->rq_snd_buf;
226 	struct iovec	niv[MAX_IOVEC];
227 	unsigned int	niov, slen, skip;
228 	mm_segment_t	oldfs;
229 	int		result;
230 
231 	if (!sock)
232 		return -ENOTCONN;
233 
234 	xprt_pktdump("packet data:",
235 				req->rq_svec->iov_base,
236 				req->rq_svec->iov_len);
237 
238 	/* Dont repeat bytes */
239 	skip = req->rq_bytes_sent;
240 	slen = xdr->len - skip;
241 	oldfs = get_fs(); set_fs(get_ds());
242 	do {
243 		unsigned int slen_part, n;
244 
245 		niov = xdr_kmap(niv, xdr, skip);
246 		if (!niov) {
247 			result = -EAGAIN;
248 			break;
249 		}
250 
251 		msg.msg_flags   = MSG_DONTWAIT|MSG_NOSIGNAL;
252 		msg.msg_iov	= niv;
253 		msg.msg_iovlen	= niov;
254 		msg.msg_name	= (struct sockaddr *) &xprt->addr;
255 		msg.msg_namelen = sizeof(xprt->addr);
256 		msg.msg_control = NULL;
257 		msg.msg_controllen = 0;
258 
259 		slen_part = 0;
260 		for (n = 0; n < niov; n++)
261 			slen_part += niv[n].iov_len;
262 
263 		clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
264 		result = sock_sendmsg(sock, &msg, slen_part);
265 
266 		xdr_kunmap(xdr, skip, niov);
267 
268 		skip += slen_part;
269 		slen -= slen_part;
270 	} while (result >= 0 && slen);
271 	set_fs(oldfs);
272 
273 	dprintk("RPC:      xprt_sendmsg(%d) = %d\n", slen, result);
274 
275 	if (result >= 0)
276 		return result;
277 
278 	switch (result) {
279 	case -ECONNREFUSED:
280 		/* When the server has died, an ICMP port unreachable message
281 		 * prompts ECONNREFUSED.
282 		 */
283 	case -EAGAIN:
284 		break;
285 	case -ECONNRESET:
286 	case -ENOTCONN:
287 	case -EPIPE:
288 		/* connection broken */
289 		if (xprt->stream)
290 			result = -ENOTCONN;
291 		break;
292 	default:
293 		printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
294 	}
295 	return result;
296 }
297 
298 /*
299  * Van Jacobson congestion avoidance. Check if the congestion window
300  * overflowed. Put the task to sleep if this is the case.
301  */
302 static int
__xprt_get_cong(struct rpc_xprt * xprt,struct rpc_task * task)303 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
304 {
305 	struct rpc_rqst *req = task->tk_rqstp;
306 
307 	if (req->rq_cong)
308 		return 1;
309 	dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
310 			task->tk_pid, xprt->cong, xprt->cwnd);
311 	if (RPCXPRT_CONGESTED(xprt))
312 		return 0;
313 	req->rq_cong = 1;
314 	xprt->cong += RPC_CWNDSCALE;
315 	return 1;
316 }
317 
318 /*
319  * Adjust the congestion window, and wake up the next task
320  * that has been sleeping due to congestion
321  */
322 static void
__xprt_put_cong(struct rpc_xprt * xprt,struct rpc_rqst * req)323 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
324 {
325 	if (!req->rq_cong)
326 		return;
327 	req->rq_cong = 0;
328 	xprt->cong -= RPC_CWNDSCALE;
329 	__xprt_lock_write_next(xprt);
330 }
331 
332 /*
333  * Adjust RPC congestion window
334  * We use a time-smoothed congestion estimator to avoid heavy oscillation.
335  */
336 static void
xprt_adjust_cwnd(struct rpc_xprt * xprt,int result)337 xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
338 {
339 	unsigned long	cwnd;
340 
341 	cwnd = xprt->cwnd;
342 	if (result >= 0 && cwnd <= xprt->cong) {
343 		/* The (cwnd >> 1) term makes sure
344 		 * the result gets rounded properly. */
345 		cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
346 		if (cwnd > RPC_MAXCWND)
347 			cwnd = RPC_MAXCWND;
348 		__xprt_lock_write_next(xprt);
349 	} else if (result == -ETIMEDOUT) {
350 		cwnd >>= 1;
351 		if (cwnd < RPC_CWNDSCALE)
352 			cwnd = RPC_CWNDSCALE;
353 	}
354 	dprintk("RPC:      cong %ld, cwnd was %ld, now %ld\n",
355 			xprt->cong, xprt->cwnd, cwnd);
356 	xprt->cwnd = cwnd;
357 }
358 
359 /*
360  * Adjust timeout values etc for next retransmit
361  */
362 int
xprt_adjust_timeout(struct rpc_timeout * to)363 xprt_adjust_timeout(struct rpc_timeout *to)
364 {
365 	if (to->to_retries > 0) {
366 		if (to->to_exponential)
367 			to->to_current <<= 1;
368 		else
369 			to->to_current += to->to_increment;
370 		if (to->to_maxval && to->to_current >= to->to_maxval)
371 			to->to_current = to->to_maxval;
372 	} else {
373 		if (to->to_exponential)
374 			to->to_initval <<= 1;
375 		else
376 			to->to_initval += to->to_increment;
377 		if (to->to_maxval && to->to_initval >= to->to_maxval)
378 			to->to_initval = to->to_maxval;
379 		to->to_current = to->to_initval;
380 	}
381 
382 	if (!to->to_current) {
383 		printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
384 		to->to_current = 5 * HZ;
385 	}
386 	pprintk("RPC: %lu %s\n", jiffies,
387 			to->to_retries? "retrans" : "timeout");
388 	return to->to_retries-- > 0;
389 }
390 
391 /*
392  * Close down a transport socket
393  */
394 static void
xprt_close(struct rpc_xprt * xprt)395 xprt_close(struct rpc_xprt *xprt)
396 {
397 	struct socket	*sock = xprt->sock;
398 	struct sock	*sk = xprt->inet;
399 
400 	if (!sk)
401 		return;
402 
403 	write_lock_bh(&sk->callback_lock);
404 	xprt->inet = NULL;
405 	xprt->sock = NULL;
406 
407 	sk->user_data    = NULL;
408 	sk->data_ready   = xprt->old_data_ready;
409 	sk->state_change = xprt->old_state_change;
410 	sk->write_space  = xprt->old_write_space;
411 	write_unlock_bh(&sk->callback_lock);
412 
413 	xprt_disconnect(xprt);
414 	sk->no_check	 = 0;
415 
416 	sock_release(sock);
417 }
418 
419 /*
420  * Mark a transport as disconnected
421  */
422 static void
xprt_disconnect(struct rpc_xprt * xprt)423 xprt_disconnect(struct rpc_xprt *xprt)
424 {
425 	dprintk("RPC:      disconnected transport %p\n", xprt);
426 	spin_lock_bh(&xprt->sock_lock);
427 	xprt_clear_connected(xprt);
428 	rpc_wake_up_status(&xprt->pending, -ENOTCONN);
429 	spin_unlock_bh(&xprt->sock_lock);
430 }
431 
432 /*
433  * Reconnect a broken TCP connection.
434  *
435  */
436 void
xprt_connect(struct rpc_task * task)437 xprt_connect(struct rpc_task *task)
438 {
439 	struct rpc_xprt	*xprt = task->tk_xprt;
440 	struct socket	*sock = xprt->sock;
441 	struct sock	*inet;
442 	int		status;
443 
444 	dprintk("RPC: %4d xprt_connect %p connected %d\n",
445 				task->tk_pid, xprt, xprt_connected(xprt));
446 	if (xprt->shutdown)
447 		return;
448 
449 	if (!xprt->addr.sin_port) {
450 		task->tk_status = -EIO;
451 		return;
452 	}
453 
454 	if (!xprt_lock_write(xprt, task))
455 		return;
456 	if (xprt_connected(xprt))
457 		goto out_write;
458 
459 	if (task->tk_rqstp)
460 		task->tk_rqstp->rq_bytes_sent = 0;
461 
462 	xprt_close(xprt);
463 	/* Create an unconnected socket */
464 	sock = xprt_create_socket(xprt->prot, &xprt->timeout, xprt->resvport);
465 	if (!sock) {
466 		/* couldn't create socket or bind to reserved port;
467 		 * this is likely a permanent error, so cause an abort */
468 		task->tk_status = -EIO;
469 		goto out_write;
470 	}
471 	xprt_bind_socket(xprt, sock);
472 
473 	if (!xprt->stream)
474 		goto out_write;
475 
476 	inet = sock->sk;
477 
478 	/* Now connect it asynchronously. */
479 	dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
480 	status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
481 				sizeof(xprt->addr), O_NONBLOCK);
482 	dprintk("RPC: %4d connect status %d connected %d\n",
483 		task->tk_pid, status, xprt_connected(xprt));
484 
485 	if (status >= 0)
486 		return;
487 
488 	switch (status) {
489 	case -EALREADY:
490 	case -EINPROGRESS:
491 		/* Protect against TCP socket state changes */
492 		lock_sock(inet);
493 		if (inet->state != TCP_ESTABLISHED) {
494 			dprintk("RPC: %4d  waiting for connection\n",
495 					task->tk_pid);
496 			task->tk_timeout = RPC_CONNECT_TIMEOUT;
497 			/* if the socket is already closing, delay briefly */
498 			if ((1<<inet->state) & ~(TCPF_SYN_SENT|TCPF_SYN_RECV))
499 				task->tk_timeout = RPC_REESTABLISH_TIMEOUT;
500 			rpc_sleep_on(&xprt->pending, task, xprt_connect_status,
501 					NULL);
502 		}
503 		release_sock(inet);
504 		break;
505 	case -ECONNREFUSED:
506 	case -ECONNRESET:
507 	case -ENOTCONN:
508 		if (!task->tk_client->cl_softrtry) {
509 			rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
510 			task->tk_status = -ENOTCONN;
511 			break;
512 		}
513 	default:
514 		/* Report myriad other possible returns.  If this file
515 		 * system is soft mounted, just error out, like Solaris.  */
516 		if (task->tk_client->cl_softrtry) {
517 			printk(KERN_WARNING
518 					"RPC: error %d connecting to server %s, exiting\n",
519 					-status, task->tk_client->cl_server);
520 			task->tk_status = -EIO;
521 			goto out_write;
522 		}
523 		printk(KERN_WARNING "RPC: error %d connecting to server %s\n",
524 				-status, task->tk_client->cl_server);
525 		/* This will prevent anybody else from connecting */
526 		rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
527 		task->tk_status = status;
528 		break;
529 	}
530 	return;
531  out_write:
532 	xprt_release_write(xprt, task);
533 }
534 
535 /*
536  * We arrive here when awoken from waiting on connection establishment.
537  */
538 static void
xprt_connect_status(struct rpc_task * task)539 xprt_connect_status(struct rpc_task *task)
540 {
541 	struct rpc_xprt	*xprt = task->tk_xprt;
542 
543 	if (task->tk_status >= 0) {
544 		dprintk("RPC: %4d xprt_connect_status: connection established\n",
545 				task->tk_pid);
546 		return;
547 	}
548 
549 	/* if soft mounted, cause this RPC to fail */
550 	if (task->tk_client->cl_softrtry)
551 		task->tk_status = -EIO;
552 
553 	switch (task->tk_status) {
554 	case -ENOTCONN:
555 		rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
556 		return;
557 	case -ETIMEDOUT:
558 		dprintk("RPC: %4d xprt_connect_status: timed out\n",
559 				task->tk_pid);
560 		break;
561 	default:
562 		printk(KERN_ERR "RPC: error %d connecting to server %s\n",
563 				-task->tk_status, task->tk_client->cl_server);
564 	}
565 	xprt_release_write(xprt, task);
566 }
567 
568 /*
569  * Look up the RPC request corresponding to a reply, and then lock it.
570  */
571 static inline struct rpc_rqst *
xprt_lookup_rqst(struct rpc_xprt * xprt,u32 xid)572 xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
573 {
574 	struct list_head *pos;
575 	struct rpc_rqst	*req = NULL;
576 
577 	list_for_each(pos, &xprt->recv) {
578 		struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
579 		if (entry->rq_xid == xid) {
580 			req = entry;
581 			break;
582 		}
583 	}
584 	return req;
585 }
586 
587 /*
588  * Complete reply received.
589  * The TCP code relies on us to remove the request from xprt->pending.
590  */
591 static void
xprt_complete_rqst(struct rpc_xprt * xprt,struct rpc_rqst * req,int copied)592 xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
593 {
594 	struct rpc_task	*task = req->rq_task;
595 	struct rpc_clnt *clnt = task->tk_client;
596 
597 	/* Adjust congestion window */
598 	if (!xprt->nocong) {
599 		int timer = rpcproc_timer(clnt, task->tk_msg.rpc_proc);
600 		xprt_adjust_cwnd(xprt, copied);
601 		__xprt_put_cong(xprt, req);
602 	       	if (req->rq_ntrans == 1) {
603 			if (timer)
604 				rpc_update_rtt(&clnt->cl_rtt, timer, (long)jiffies - req->rq_xtime);
605 		}
606 		rpc_set_timeo(&clnt->cl_rtt, timer, req->rq_ntrans - 1);
607 	}
608 
609 #ifdef RPC_PROFILE
610 	/* Profile only reads for now */
611 	if (copied > 1024) {
612 		static unsigned long	nextstat = 0;
613 		static unsigned long	pkt_rtt = 0, pkt_len = 0, pkt_cnt = 0;
614 
615 		pkt_cnt++;
616 		pkt_len += req->rq_slen + copied;
617 		pkt_rtt += jiffies - req->rq_xtime;
618 		if (time_before(nextstat, jiffies)) {
619 			printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
620 			printk("RPC: %ld %ld %ld %ld stat\n",
621 					jiffies, pkt_cnt, pkt_len, pkt_rtt);
622 			pkt_rtt = pkt_len = pkt_cnt = 0;
623 			nextstat = jiffies + 5 * HZ;
624 		}
625 	}
626 #endif
627 
628 	dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
629 	req->rq_received = copied;
630 	list_del_init(&req->rq_list);
631 
632 	/* ... and wake up the process. */
633 	rpc_wake_up_task(task);
634 	return;
635 }
636 
637 static size_t
skb_read_bits(skb_reader_t * desc,void * to,size_t len)638 skb_read_bits(skb_reader_t *desc, void *to, size_t len)
639 {
640 	if (len > desc->count)
641 		len = desc->count;
642 	skb_copy_bits(desc->skb, desc->offset, to, len);
643 	desc->count -= len;
644 	desc->offset += len;
645 	return len;
646 }
647 
648 static size_t
skb_read_and_csum_bits(skb_reader_t * desc,void * to,size_t len)649 skb_read_and_csum_bits(skb_reader_t *desc, void *to, size_t len)
650 {
651 	unsigned int csum2, pos;
652 
653 	if (len > desc->count)
654 		len = desc->count;
655 	pos = desc->offset;
656 	csum2 = skb_copy_and_csum_bits(desc->skb, pos, to, len, 0);
657 	desc->csum = csum_block_add(desc->csum, csum2, pos);
658 	desc->count -= len;
659 	desc->offset += len;
660 	return len;
661 }
662 
663 /*
664  * We have set things up such that we perform the checksum of the UDP
665  * packet in parallel with the copies into the RPC client iovec.  -DaveM
666  */
667 static int
csum_partial_copy_to_xdr(struct xdr_buf * xdr,struct sk_buff * skb)668 csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
669 {
670 	skb_reader_t desc;
671 
672 	desc.skb = skb;
673 	desc.offset = sizeof(struct udphdr);
674 	desc.count = skb->len - desc.offset;
675 
676 	if (skb->ip_summed == CHECKSUM_UNNECESSARY)
677 		goto no_checksum;
678 
679 	desc.csum = csum_partial(skb->data, desc.offset, skb->csum);
680 	xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_and_csum_bits);
681 	if (desc.offset != skb->len) {
682 		unsigned int csum2;
683 		csum2 = skb_checksum(skb, desc.offset, skb->len - desc.offset, 0);
684 		desc.csum = csum_block_add(desc.csum, csum2, desc.offset);
685 	}
686 	if ((unsigned short)csum_fold(desc.csum))
687 		return -1;
688 	return 0;
689 no_checksum:
690 	xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_bits);
691 	return 0;
692 }
693 
694 /*
695  * Input handler for RPC replies. Called from a bottom half and hence
696  * atomic.
697  */
698 static void
udp_data_ready(struct sock * sk,int len)699 udp_data_ready(struct sock *sk, int len)
700 {
701 	struct rpc_task	*task;
702 	struct rpc_xprt	*xprt;
703 	struct rpc_rqst *rovr;
704 	struct sk_buff	*skb;
705 	int		err, repsize, copied;
706 
707 	read_lock(&sk->callback_lock);
708 	dprintk("RPC:      udp_data_ready...\n");
709 	if (sk->dead || !(xprt = xprt_from_sock(sk))) {
710 		printk("RPC:      udp_data_ready request not found!\n");
711 		goto out;
712 	}
713 
714 	dprintk("RPC:      udp_data_ready client %p\n", xprt);
715 
716 	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
717 		goto out;
718 
719 	if (xprt->shutdown)
720 		goto dropit;
721 
722 	repsize = skb->len - sizeof(struct udphdr);
723 	if (repsize < 4) {
724 		printk("RPC: impossible RPC reply size %d!\n", repsize);
725 		goto dropit;
726 	}
727 
728 	/* Look up and lock the request corresponding to the given XID */
729 	spin_lock(&xprt->sock_lock);
730 	rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr)));
731 	if (!rovr)
732 		goto out_unlock;
733 	task = rovr->rq_task;
734 
735 	dprintk("RPC: %4d received reply\n", task->tk_pid);
736 	xprt_pktdump("packet data:",
737 		     (u32 *) (skb->h.raw+sizeof(struct udphdr)), repsize);
738 
739 	if ((copied = rovr->rq_private_buf.len) > repsize)
740 		copied = repsize;
741 
742 	/* Suck it into the iovec, verify checksum if not done by hw. */
743 	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))
744 		goto out_unlock;
745 
746 	/* Something worked... */
747 	dst_confirm(skb->dst);
748 
749 	xprt_complete_rqst(xprt, rovr, copied);
750 
751  out_unlock:
752 	spin_unlock(&xprt->sock_lock);
753  dropit:
754 	skb_free_datagram(sk, skb);
755  out:
756 	if (sk->sleep && waitqueue_active(sk->sleep))
757 		wake_up_interruptible(sk->sleep);
758 	read_unlock(&sk->callback_lock);
759 }
760 
761 /*
762  * Copy from an skb into memory and shrink the skb.
763  */
764 static inline size_t
tcp_copy_data(skb_reader_t * desc,void * p,size_t len)765 tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
766 {
767 	if (len > desc->count)
768 		len = desc->count;
769 	skb_copy_bits(desc->skb, desc->offset, p, len);
770 	desc->offset += len;
771 	desc->count -= len;
772 	return len;
773 }
774 
775 /*
776  * TCP read fragment marker
777  */
778 static inline void
tcp_read_fraghdr(struct rpc_xprt * xprt,skb_reader_t * desc)779 tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
780 {
781 	size_t len, used;
782 	char *p;
783 
784 	p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
785 	len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
786 	used = tcp_copy_data(desc, p, len);
787 	xprt->tcp_offset += used;
788 	if (used != len)
789 		return;
790 	xprt->tcp_reclen = ntohl(xprt->tcp_recm);
791 	if (xprt->tcp_reclen & 0x80000000)
792 		xprt->tcp_flags |= XPRT_LAST_FRAG;
793 	else
794 		xprt->tcp_flags &= ~XPRT_LAST_FRAG;
795 	xprt->tcp_reclen &= 0x7fffffff;
796 	xprt->tcp_flags &= ~XPRT_COPY_RECM;
797 	xprt->tcp_offset = 0;
798 	/* Sanity check of the record length */
799 	if (xprt->tcp_reclen < 4) {
800 		printk(KERN_ERR "RPC: Invalid TCP record fragment length\n");
801 		xprt_disconnect(xprt);
802 	}
803 	dprintk("RPC:      reading TCP record fragment of length %d\n",
804 			xprt->tcp_reclen);
805 }
806 
807 static void
tcp_check_recm(struct rpc_xprt * xprt)808 tcp_check_recm(struct rpc_xprt *xprt)
809 {
810 	if (xprt->tcp_offset == xprt->tcp_reclen) {
811 		xprt->tcp_flags |= XPRT_COPY_RECM;
812 		xprt->tcp_offset = 0;
813 		if (xprt->tcp_flags & XPRT_LAST_FRAG) {
814 			xprt->tcp_flags &= ~XPRT_COPY_DATA;
815 			xprt->tcp_flags |= XPRT_COPY_XID;
816 			xprt->tcp_copied = 0;
817 		}
818 	}
819 }
820 
821 /*
822  * TCP read xid
823  */
824 static inline void
tcp_read_xid(struct rpc_xprt * xprt,skb_reader_t * desc)825 tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
826 {
827 	size_t len, used;
828 	char *p;
829 
830 	len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
831 	dprintk("RPC:      reading XID (%Zu bytes)\n", len);
832 	p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
833 	used = tcp_copy_data(desc, p, len);
834 	xprt->tcp_offset += used;
835 	if (used != len)
836 		return;
837 	xprt->tcp_flags &= ~XPRT_COPY_XID;
838 	xprt->tcp_flags |= XPRT_COPY_DATA;
839 	xprt->tcp_copied = 4;
840 	dprintk("RPC:      reading reply for XID %08x\n", xprt->tcp_xid);
841 	tcp_check_recm(xprt);
842 }
843 
844 /*
845  * TCP read and complete request
846  */
847 static inline void
tcp_read_request(struct rpc_xprt * xprt,skb_reader_t * desc)848 tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
849 {
850 	struct rpc_rqst *req;
851 	struct xdr_buf *rcvbuf;
852 	size_t len;
853 
854 	/* Find and lock the request corresponding to this xid */
855 	spin_lock(&xprt->sock_lock);
856 	req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
857 	if (!req) {
858 		xprt->tcp_flags &= ~XPRT_COPY_DATA;
859 		dprintk("RPC:      XID %08x request not found!\n",
860 				xprt->tcp_xid);
861 		spin_unlock(&xprt->sock_lock);
862 		return;
863 	}
864 
865 	rcvbuf = &req->rq_private_buf;
866 	len = desc->count;
867 	if (len > xprt->tcp_reclen - xprt->tcp_offset) {
868 		skb_reader_t my_desc;
869 
870 		len = xprt->tcp_reclen - xprt->tcp_offset;
871 		memcpy(&my_desc, desc, sizeof(my_desc));
872 		my_desc.count = len;
873 		xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
874 					  &my_desc, tcp_copy_data);
875 		desc->count -= len;
876 		desc->offset += len;
877 	} else
878 		xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
879 					  desc, tcp_copy_data);
880 	xprt->tcp_copied += len;
881 	xprt->tcp_offset += len;
882 
883 	if (xprt->tcp_copied == req->rq_private_buf.len)
884 		xprt->tcp_flags &= ~XPRT_COPY_DATA;
885 	else if (xprt->tcp_offset == xprt->tcp_reclen) {
886 		if (xprt->tcp_flags & XPRT_LAST_FRAG)
887 			xprt->tcp_flags &= ~XPRT_COPY_DATA;
888 	}
889 
890 	if (!(xprt->tcp_flags & XPRT_COPY_DATA)) {
891 		dprintk("RPC: %4d received reply complete\n",
892 				req->rq_task->tk_pid);
893 		xprt_complete_rqst(xprt, req, xprt->tcp_copied);
894 	}
895 	spin_unlock(&xprt->sock_lock);
896 	tcp_check_recm(xprt);
897 }
898 
899 /*
900  * TCP discard extra bytes from a short read
901  */
902 static inline void
tcp_read_discard(struct rpc_xprt * xprt,skb_reader_t * desc)903 tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
904 {
905 	size_t len;
906 
907 	len = xprt->tcp_reclen - xprt->tcp_offset;
908 	if (len > desc->count)
909 		len = desc->count;
910 	desc->count -= len;
911 	desc->offset += len;
912 	xprt->tcp_offset += len;
913 	tcp_check_recm(xprt);
914 }
915 
916 /*
917  * TCP record receive routine
918  * We first have to grab the record marker, then the XID, then the data.
919  */
920 static int
tcp_data_recv(read_descriptor_t * rd_desc,struct sk_buff * skb,unsigned int offset,size_t len)921 tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
922 		unsigned int offset, size_t len)
923 {
924 	struct rpc_xprt *xprt = (struct rpc_xprt *)rd_desc->buf;
925 	skb_reader_t desc = { skb, offset, len };
926 
927 	dprintk("RPC:      tcp_data_recv\n");
928 	do {
929 		/* Read in a new fragment marker if necessary */
930 		/* Can we ever really expect to get completely empty fragments? */
931 		if (xprt->tcp_flags & XPRT_COPY_RECM) {
932 			tcp_read_fraghdr(xprt, &desc);
933 			continue;
934 		}
935 		/* Read in the xid if necessary */
936 		if (xprt->tcp_flags & XPRT_COPY_XID) {
937 			tcp_read_xid(xprt, &desc);
938 			continue;
939 		}
940 		/* Read in the request data */
941 		if (xprt->tcp_flags & XPRT_COPY_DATA) {
942 			tcp_read_request(xprt, &desc);
943 			continue;
944 		}
945 		/* Skip over any trailing bytes on short reads */
946 		tcp_read_discard(xprt, &desc);
947 	} while (desc.count);
948 	dprintk("RPC:      tcp_data_recv done\n");
949 	return len - desc.count;
950 }
951 
tcp_data_ready(struct sock * sk,int bytes)952 static void tcp_data_ready(struct sock *sk, int bytes)
953 {
954 	struct rpc_xprt *xprt;
955 	read_descriptor_t rd_desc;
956 
957 	read_lock(&sk->callback_lock);
958 	dprintk("RPC:      tcp_data_ready...\n");
959 	if (!(xprt = xprt_from_sock(sk))) {
960 		printk("RPC:      tcp_data_ready socket info not found!\n");
961 		goto out;
962 	}
963 	if (xprt->shutdown)
964 		goto out;
965 
966 	/* We use rd_desc to pass struct xprt to tcp_data_recv */
967 	rd_desc.buf = (char *)xprt;
968 	rd_desc.count = 65536;
969 	tcp_read_sock(sk, &rd_desc, tcp_data_recv);
970 out:
971 	read_unlock(&sk->callback_lock);
972 }
973 
974 static void
tcp_state_change(struct sock * sk)975 tcp_state_change(struct sock *sk)
976 {
977 	struct rpc_xprt	*xprt;
978 
979 	read_lock(&sk->callback_lock);
980 	if (!(xprt = xprt_from_sock(sk)))
981 		goto out;
982 	dprintk("RPC:      tcp_state_change client %p...\n", xprt);
983 	dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
984 				sk->state, xprt_connected(xprt),
985 				sk->dead, sk->zapped);
986 
987 	switch (sk->state) {
988 	case TCP_ESTABLISHED:
989 		if (xprt_test_and_set_connected(xprt))
990 			break;
991 
992 		/* Reset TCP record info */
993 		xprt->tcp_offset = 0;
994 		xprt->tcp_reclen = 0;
995 		xprt->tcp_copied = 0;
996 		xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
997 
998 		spin_lock_bh(&xprt->sock_lock);
999 		if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
1000 			rpc_wake_up_task(xprt->snd_task);
1001 		spin_unlock_bh(&xprt->sock_lock);
1002 		break;
1003 	case TCP_SYN_SENT:
1004 	case TCP_SYN_RECV:
1005 		break;
1006 	default:
1007 		xprt_disconnect(xprt);
1008 		break;
1009 	}
1010  out:
1011 	if (sk->sleep && waitqueue_active(sk->sleep))
1012 		wake_up_interruptible_all(sk->sleep);
1013 	read_unlock(&sk->callback_lock);
1014 }
1015 
1016 /*
1017  * Called when more output buffer space is available for this socket.
1018  * We try not to wake our writers until they can make "significant"
1019  * progress, otherwise we'll waste resources thrashing sock_sendmsg
1020  * with a bunch of small requests.
1021  */
1022 static void
xprt_write_space(struct sock * sk)1023 xprt_write_space(struct sock *sk)
1024 {
1025 	struct rpc_xprt	*xprt;
1026 	struct socket	*sock;
1027 
1028 	read_lock(&sk->callback_lock);
1029 	if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->socket))
1030 		goto out;
1031 	if (xprt->shutdown)
1032 		goto out;
1033 
1034 	/* Wait until we have enough socket memory */
1035 	if (xprt->stream) {
1036 		/* from net/ipv4/tcp.c:tcp_write_space */
1037 		if (tcp_wspace(sk) < tcp_min_write_space(sk))
1038 			goto out;
1039 	} else {
1040 		/* from net/core/sock.c:sock_def_write_space */
1041 		if (!sock_writeable(sk))
1042 			goto out;
1043 	}
1044 
1045 	if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags))
1046 		goto out;
1047 
1048 	spin_lock_bh(&xprt->sock_lock);
1049 	if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
1050 		rpc_wake_up_task(xprt->snd_task);
1051 	spin_unlock_bh(&xprt->sock_lock);
1052 	if (sk->sleep && waitqueue_active(sk->sleep))
1053 		wake_up_interruptible(sk->sleep);
1054 out:
1055 	read_unlock(&sk->callback_lock);
1056 }
1057 
1058 /*
1059  * RPC receive timeout handler.
1060  */
1061 static void
xprt_timer(struct rpc_task * task)1062 xprt_timer(struct rpc_task *task)
1063 {
1064 	struct rpc_rqst	*req = task->tk_rqstp;
1065 	struct rpc_xprt *xprt = req->rq_xprt;
1066 
1067 	spin_lock(&xprt->sock_lock);
1068 	if (req->rq_received)
1069 		goto out;
1070 
1071 	xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT);
1072 	__xprt_put_cong(xprt, req);
1073 
1074 	dprintk("RPC: %4d xprt_timer (%s request)\n",
1075 		task->tk_pid, req ? "pending" : "backlogged");
1076 
1077 	task->tk_status  = -ETIMEDOUT;
1078 out:
1079 	task->tk_timeout = 0;
1080 	rpc_wake_up_task(task);
1081 	spin_unlock(&xprt->sock_lock);
1082 }
1083 
1084 /*
1085  * Place the actual RPC call.
1086  * We have to copy the iovec because sendmsg fiddles with its contents.
1087  */
1088 void
xprt_transmit(struct rpc_task * task)1089 xprt_transmit(struct rpc_task *task)
1090 {
1091 	struct rpc_rqst	*req = task->tk_rqstp;
1092 	struct rpc_xprt	*xprt = req->rq_xprt;
1093 
1094 	dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid,
1095 				*(u32 *)(req->rq_svec[0].iov_base));
1096 
1097 	if (xprt->shutdown)
1098 		task->tk_status = -EIO;
1099 
1100 	if (task->tk_status < 0)
1101 		return;
1102 
1103 	if (task->tk_rpcwait)
1104 		rpc_remove_wait_queue(task);
1105 
1106 	/* set up everything as needed. */
1107 	/* Write the record marker */
1108 	if (xprt->stream) {
1109 		u32	*marker = req->rq_svec[0].iov_base;
1110 
1111 		*marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
1112 	}
1113 
1114 	spin_lock_bh(&xprt->sock_lock);
1115 	if (req->rq_received != 0 && !req->rq_bytes_sent)
1116 		goto out_notrans;
1117 
1118 	if (!__xprt_lock_write(xprt, task))
1119 		goto out_notrans;
1120 
1121 	if (!xprt_connected(xprt)) {
1122 		task->tk_status = -ENOTCONN;
1123 		goto out_notrans;
1124 	}
1125 
1126 	if (list_empty(&req->rq_list)) {
1127 		/* Update the softirq receive buffer */
1128 		memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
1129 				sizeof(req->rq_private_buf));
1130 		list_add_tail(&req->rq_list, &xprt->recv);
1131 	}
1132 	spin_unlock_bh(&xprt->sock_lock);
1133 
1134 	do_xprt_transmit(task);
1135 	return;
1136 out_notrans:
1137 	spin_unlock_bh(&xprt->sock_lock);
1138 }
1139 
1140 static void
do_xprt_transmit(struct rpc_task * task)1141 do_xprt_transmit(struct rpc_task *task)
1142 {
1143 	struct rpc_clnt *clnt = task->tk_client;
1144 	struct rpc_rqst	*req = task->tk_rqstp;
1145 	struct rpc_xprt	*xprt = req->rq_xprt;
1146 	int status, retry = 0;
1147 
1148 
1149 	/* Continue transmitting the packet/record. We must be careful
1150 	 * to cope with writespace callbacks arriving _after_ we have
1151 	 * called xprt_sendmsg().
1152 	 */
1153 	while (1) {
1154 		req->rq_xtime = jiffies;
1155 		status = xprt_sendmsg(xprt, req);
1156 
1157 		if (status < 0)
1158 			break;
1159 
1160 		if (xprt->stream) {
1161 			req->rq_bytes_sent += status;
1162 
1163 			/* If we've sent the entire packet, immediately
1164 			 * reset the count of bytes sent. */
1165 			if (req->rq_bytes_sent >= req->rq_slen) {
1166 				req->rq_bytes_sent = 0;
1167 				goto out_receive;
1168 			}
1169 		} else {
1170 			if (status >= req->rq_slen)
1171 				goto out_receive;
1172 			status = -EAGAIN;
1173 			break;
1174 		}
1175 
1176 		dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1177 				task->tk_pid, req->rq_slen - req->rq_bytes_sent,
1178 				req->rq_slen);
1179 
1180 		status = -EAGAIN;
1181 		if (retry++ > 50)
1182 			break;
1183 	}
1184 
1185 	/* If we're doing a resend and have received a reply already,
1186 	 * then exit early.
1187 	 * Note, though, that we can't do this if we've already started
1188 	 * resending down a TCP stream.
1189 	 */
1190 	task->tk_status = status;
1191 
1192 	switch (status) {
1193 	case -EAGAIN:
1194 		if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
1195 			/* Protect against races with xprt_write_space */
1196 			spin_lock_bh(&xprt->sock_lock);
1197 			/* Don't race with disconnect */
1198 			if (!xprt_connected(xprt))
1199 				task->tk_status = -ENOTCONN;
1200 			else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) {
1201 				task->tk_timeout = req->rq_timeout.to_current;
1202 				rpc_sleep_on(&xprt->pending, task, NULL, NULL);
1203 			}
1204 			spin_unlock_bh(&xprt->sock_lock);
1205 			return;
1206 		}
1207 		/* Keep holding the socket if it is blocked */
1208 		rpc_delay(task, HZ>>4);
1209 		return;
1210 	case -ECONNREFUSED:
1211 		task->tk_timeout = RPC_REESTABLISH_TIMEOUT;
1212 		rpc_sleep_on(&xprt->sending, task, NULL, NULL);
1213 	case -ENOTCONN:
1214 		return;
1215 	default:
1216 		if (xprt->stream)
1217 			xprt_disconnect(xprt);
1218 	}
1219 	xprt_release_write(xprt, task);
1220 	return;
1221  out_receive:
1222 	dprintk("RPC: %4d xmit complete\n", task->tk_pid);
1223 	spin_lock_bh(&xprt->sock_lock);
1224 	/* Set the task's receive timeout value */
1225 	if (!xprt->nocong) {
1226 		int timer = rpcproc_timer(clnt, task->tk_msg.rpc_proc);
1227 		task->tk_timeout = rpc_calc_rto(&clnt->cl_rtt, timer);
1228 		task->tk_timeout <<= rpc_ntimeo(&clnt->cl_rtt, timer);
1229 		task->tk_timeout <<= clnt->cl_timeout.to_retries
1230 			- req->rq_timeout.to_retries;
1231 		if (task->tk_timeout > req->rq_timeout.to_maxval)
1232 			task->tk_timeout = req->rq_timeout.to_maxval;
1233 	} else
1234 		task->tk_timeout = req->rq_timeout.to_current;
1235 	/* Don't race with disconnect */
1236 	if (!xprt_connected(xprt))
1237 		task->tk_status = -ENOTCONN;
1238 	else if (!req->rq_received)
1239 		rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
1240 	__xprt_release_write(xprt, task);
1241 	spin_unlock_bh(&xprt->sock_lock);
1242 }
1243 
1244 /*
1245  * Reserve an RPC call slot.
1246  */
1247 static inline void
do_xprt_reserve(struct rpc_task * task)1248 do_xprt_reserve(struct rpc_task *task)
1249 {
1250 	struct rpc_xprt	*xprt = task->tk_xprt;
1251 
1252 	task->tk_status = 0;
1253 	if (task->tk_rqstp)
1254 		return;
1255 	if (xprt->free) {
1256 		struct rpc_rqst	*req = xprt->free;
1257 		xprt->free = req->rq_next;
1258 		req->rq_next = NULL;
1259 		task->tk_rqstp = req;
1260 		xprt_request_init(task, xprt);
1261 		return;
1262 	}
1263 	dprintk("RPC:      waiting for request slot\n");
1264 	task->tk_status = -EAGAIN;
1265 	task->tk_timeout = 0;
1266 	rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
1267 }
1268 
1269 void
xprt_reserve(struct rpc_task * task)1270 xprt_reserve(struct rpc_task *task)
1271 {
1272 	struct rpc_xprt	*xprt = task->tk_xprt;
1273 
1274 	task->tk_status = -EIO;
1275 	if (!xprt->shutdown) {
1276 		spin_lock(&xprt->xprt_lock);
1277 		do_xprt_reserve(task);
1278 		spin_unlock(&xprt->xprt_lock);
1279 	}
1280 }
1281 
1282 /*
1283  * Allocate a 'unique' XID
1284  */
1285 static u32
xprt_alloc_xid(void)1286 xprt_alloc_xid(void)
1287 {
1288 	static spinlock_t xid_lock = SPIN_LOCK_UNLOCKED;
1289 	static int need_init = 1;
1290 	static u32 xid;
1291 	u32 ret;
1292 
1293 	spin_lock(&xid_lock);
1294 	if (unlikely(need_init)) {
1295 		xid = CURRENT_TIME << 12;
1296 		need_init = 0;
1297 	}
1298 	ret = xid++;
1299 	spin_unlock(&xid_lock);
1300 	return ret;
1301 }
1302 
1303 /*
1304  * Initialize RPC request
1305  */
1306 static void
xprt_request_init(struct rpc_task * task,struct rpc_xprt * xprt)1307 xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
1308 {
1309 	struct rpc_rqst	*req = task->tk_rqstp;
1310 
1311 	req->rq_timeout = xprt->timeout;
1312 	req->rq_task	= task;
1313 	req->rq_xprt    = xprt;
1314 	req->rq_xid     = xprt_alloc_xid();
1315 	INIT_LIST_HEAD(&req->rq_list);
1316 	dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
1317 			req, req->rq_xid);
1318 }
1319 
1320 /*
1321  * Release an RPC call slot
1322  */
1323 void
xprt_release(struct rpc_task * task)1324 xprt_release(struct rpc_task *task)
1325 {
1326 	struct rpc_xprt	*xprt = task->tk_xprt;
1327 	struct rpc_rqst	*req;
1328 
1329 	if (!(req = task->tk_rqstp))
1330 		return;
1331 	spin_lock_bh(&xprt->sock_lock);
1332 	__xprt_release_write(xprt, task);
1333 	__xprt_put_cong(xprt, req);
1334 	if (!list_empty(&req->rq_list))
1335 		list_del(&req->rq_list);
1336 	spin_unlock_bh(&xprt->sock_lock);
1337 	task->tk_rqstp = NULL;
1338 	memset(req, 0, sizeof(*req));	/* mark unused */
1339 
1340 	dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
1341 
1342 	spin_lock(&xprt->xprt_lock);
1343 	req->rq_next = xprt->free;
1344 	xprt->free   = req;
1345 
1346 	xprt_clear_backlog(xprt);
1347 	spin_unlock(&xprt->xprt_lock);
1348 }
1349 
1350 /*
1351  * Set default timeout parameters
1352  */
1353 void
xprt_default_timeout(struct rpc_timeout * to,int proto)1354 xprt_default_timeout(struct rpc_timeout *to, int proto)
1355 {
1356 	if (proto == IPPROTO_UDP)
1357 		xprt_set_timeout(to, 5,  5 * HZ);
1358 	else
1359 		xprt_set_timeout(to, 5, 60 * HZ);
1360 }
1361 
1362 /*
1363  * Set constant timeout
1364  */
1365 void
xprt_set_timeout(struct rpc_timeout * to,unsigned int retr,unsigned long incr)1366 xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
1367 {
1368 	to->to_current   =
1369 	to->to_initval   =
1370 	to->to_increment = incr;
1371 	to->to_maxval    = incr * retr;
1372 	to->to_retries   = retr;
1373 	to->to_exponential = 0;
1374 }
1375 
1376 /*
1377  * Initialize an RPC client
1378  */
1379 static struct rpc_xprt *
xprt_setup(int proto,struct sockaddr_in * ap,struct rpc_timeout * to)1380 xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
1381 {
1382 	struct rpc_xprt	*xprt;
1383 	struct rpc_rqst	*req;
1384 	int		i;
1385 
1386 	dprintk("RPC:      setting up %s transport...\n",
1387 				proto == IPPROTO_UDP? "UDP" : "TCP");
1388 
1389 	if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
1390 		return NULL;
1391 	memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
1392 
1393 	xprt->addr = *ap;
1394 	xprt->prot = proto;
1395 	xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
1396 	if (xprt->stream) {
1397 		xprt->cwnd = RPC_MAXCWND;
1398 		xprt->nocong = 1;
1399 	} else
1400 		xprt->cwnd = RPC_INITCWND;
1401 	spin_lock_init(&xprt->sock_lock);
1402 	spin_lock_init(&xprt->xprt_lock);
1403 	init_waitqueue_head(&xprt->cong_wait);
1404 
1405 	INIT_LIST_HEAD(&xprt->recv);
1406 
1407 	/* Set timeout parameters */
1408 	if (to) {
1409 		xprt->timeout = *to;
1410 		xprt->timeout.to_current = to->to_initval;
1411 	} else
1412 		xprt_default_timeout(&xprt->timeout, xprt->prot);
1413 
1414 	INIT_RPC_WAITQ(&xprt->pending, "xprt_pending");
1415 	INIT_RPC_WAITQ(&xprt->sending, "xprt_sending");
1416 	INIT_RPC_WAITQ(&xprt->resend, "xprt_resend");
1417 	INIT_RPC_WAITQ(&xprt->backlog, "xprt_backlog");
1418 
1419 	/* initialize free list */
1420 	for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
1421 		req->rq_next = req + 1;
1422 	req->rq_next = NULL;
1423 	xprt->free = xprt->slot;
1424 
1425 	/* Check whether we want to use a reserved port */
1426 	xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
1427 
1428 	dprintk("RPC:      created transport %p\n", xprt);
1429 
1430 	return xprt;
1431 }
1432 
1433 /*
1434  * Bind to a reserved port
1435  */
1436 static inline int
xprt_bindresvport(struct socket * sock)1437 xprt_bindresvport(struct socket *sock)
1438 {
1439 	struct sockaddr_in myaddr;
1440 	int		err, port;
1441 	kernel_cap_t saved_cap = current->cap_effective;
1442 
1443 	/* Override capabilities.
1444 	 * They were checked in xprt_create_proto i.e. at mount time
1445 	 */
1446 	cap_raise (current->cap_effective, CAP_NET_BIND_SERVICE);
1447 
1448 	memset(&myaddr, 0, sizeof(myaddr));
1449 	myaddr.sin_family = AF_INET;
1450 	port = 800;
1451 	do {
1452 		myaddr.sin_port = htons(port);
1453 		err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
1454 						sizeof(myaddr));
1455 	} while (err == -EADDRINUSE && --port > 0);
1456 	current->cap_effective = saved_cap;
1457 
1458 	if (err < 0)
1459 		printk("RPC: Can't bind to reserved port (%d).\n", -err);
1460 
1461 	return err;
1462 }
1463 
1464 static int
xprt_bind_socket(struct rpc_xprt * xprt,struct socket * sock)1465 xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
1466 {
1467 	struct sock	*sk = sock->sk;
1468 
1469 	if (xprt->inet)
1470 		return -EBUSY;
1471 
1472 	write_lock_bh(&sk->callback_lock);
1473 	sk->user_data = xprt;
1474 	xprt->old_data_ready = sk->data_ready;
1475 	xprt->old_state_change = sk->state_change;
1476 	xprt->old_write_space = sk->write_space;
1477 	if (xprt->prot == IPPROTO_UDP) {
1478 		sk->data_ready = udp_data_ready;
1479 		sk->no_check = UDP_CSUM_NORCV;
1480 		xprt_set_connected(xprt);
1481 	} else {
1482 		struct tcp_opt *tp = &(sk->tp_pinfo.af_tcp);
1483 		tp->nonagle = 1;	/* disable Nagle's algorithm */
1484 		sk->data_ready = tcp_data_ready;
1485 		sk->state_change = tcp_state_change;
1486 		xprt_clear_connected(xprt);
1487 	}
1488 	sk->write_space = xprt_write_space;
1489 
1490 	/* Reset to new socket */
1491 	xprt->sock = sock;
1492 	xprt->inet = sk;
1493 	write_unlock_bh(&sk->callback_lock);
1494 
1495 	return 0;
1496 }
1497 
1498 /*
1499  * Set socket buffer length
1500  */
1501 void
xprt_sock_setbufsize(struct rpc_xprt * xprt)1502 xprt_sock_setbufsize(struct rpc_xprt *xprt)
1503 {
1504 	struct sock *sk = xprt->inet;
1505 
1506 	if (xprt->stream)
1507 		return;
1508 	if (xprt->rcvsize) {
1509 		sk->userlocks |= SOCK_RCVBUF_LOCK;
1510 		sk->rcvbuf = xprt->rcvsize * RPC_MAXCONG * 2;
1511 	}
1512 	if (xprt->sndsize) {
1513 		sk->userlocks |= SOCK_SNDBUF_LOCK;
1514 		sk->sndbuf = xprt->sndsize * RPC_MAXCONG * 2;
1515 		sk->write_space(sk);
1516 	}
1517 }
1518 
1519 /*
1520  * Create a client socket given the protocol and peer address.
1521  */
1522 static struct socket *
xprt_create_socket(int proto,struct rpc_timeout * to,int resvport)1523 xprt_create_socket(int proto, struct rpc_timeout *to, int resvport)
1524 {
1525 	struct socket	*sock;
1526 	int		type, err;
1527 
1528 	dprintk("RPC:      xprt_create_socket(%s %d)\n",
1529 			   (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
1530 
1531 	type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
1532 
1533 	if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
1534 		printk("RPC: can't create socket (%d).\n", -err);
1535 		goto failed;
1536 	}
1537 
1538 	/* bind to a reserved port */
1539 	if (resvport && xprt_bindresvport(sock) < 0)
1540 		goto failed;
1541 
1542 	return sock;
1543 
1544 failed:
1545 	sock_release(sock);
1546 	return NULL;
1547 }
1548 
1549 /*
1550  * Create an RPC client transport given the protocol and peer address.
1551  */
1552 struct rpc_xprt *
xprt_create_proto(int proto,struct sockaddr_in * sap,struct rpc_timeout * to)1553 xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1554 {
1555 	struct rpc_xprt	*xprt;
1556 
1557 	xprt = xprt_setup(proto, sap, to);
1558 	if (!xprt)
1559 		goto out_bad;
1560 
1561 	dprintk("RPC:      xprt_create_proto created xprt %p\n", xprt);
1562 	return xprt;
1563 out_bad:
1564 	dprintk("RPC:      xprt_create_proto failed\n");
1565 	if (xprt)
1566 		kfree(xprt);
1567 	return NULL;
1568 }
1569 
1570 /*
1571  * Prepare for transport shutdown.
1572  */
1573 void
xprt_shutdown(struct rpc_xprt * xprt)1574 xprt_shutdown(struct rpc_xprt *xprt)
1575 {
1576 	xprt->shutdown = 1;
1577 	rpc_wake_up(&xprt->sending);
1578 	rpc_wake_up(&xprt->resend);
1579 	rpc_wake_up(&xprt->pending);
1580 	rpc_wake_up(&xprt->backlog);
1581 	if (waitqueue_active(&xprt->cong_wait))
1582 		wake_up(&xprt->cong_wait);
1583 }
1584 
1585 /*
1586  * Clear the xprt backlog queue
1587  */
1588 int
xprt_clear_backlog(struct rpc_xprt * xprt)1589 xprt_clear_backlog(struct rpc_xprt *xprt) {
1590 	rpc_wake_up_next(&xprt->backlog);
1591 	if (waitqueue_active(&xprt->cong_wait))
1592 		wake_up(&xprt->cong_wait);
1593 	return 1;
1594 }
1595 
1596 /*
1597  * Destroy an RPC transport, killing off all requests.
1598  */
1599 int
xprt_destroy(struct rpc_xprt * xprt)1600 xprt_destroy(struct rpc_xprt *xprt)
1601 {
1602 	dprintk("RPC:      destroying transport %p\n", xprt);
1603 	xprt_shutdown(xprt);
1604 	xprt_close(xprt);
1605 	kfree(xprt);
1606 
1607 	return 0;
1608 }
1609