1 /*
2 * linux/net/sunrpc/xprt.c
3 *
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
6 *
7 * The interface works like this:
8 *
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
11 * (xprt_reserve).
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
23 * of -ETIMEDOUT.
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
28 * again.
29 *
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
33 *
34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
35 *
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38 * TCP NFS related read + write fixes
39 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
40 *
41 * Rewrite of larges part of the code in order to stabilize TCP stuff.
42 * Fix behaviour when socket buffer is full.
43 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
44 */
45
46 #define __KERNEL_SYSCALLS__
47
48 #include <linux/version.h>
49 #include <linux/types.h>
50 #include <linux/slab.h>
51 #include <linux/capability.h>
52 #include <linux/sched.h>
53 #include <linux/errno.h>
54 #include <linux/socket.h>
55 #include <linux/in.h>
56 #include <linux/net.h>
57 #include <linux/mm.h>
58 #include <linux/udp.h>
59 #include <linux/unistd.h>
60 #include <linux/sunrpc/clnt.h>
61 #include <linux/file.h>
62
63 #include <net/sock.h>
64 #include <net/checksum.h>
65 #include <net/udp.h>
66 #include <net/tcp.h>
67
68 #include <asm/uaccess.h>
69
70 /*
71 * Local variables
72 */
73
74 #ifdef RPC_DEBUG
75 # undef RPC_DEBUG_DATA
76 # define RPCDBG_FACILITY RPCDBG_XPRT
77 #endif
78
79 #define XPRT_MAX_BACKOFF (8)
80
81 /*
82 * Local functions
83 */
84 static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
85 static void do_xprt_transmit(struct rpc_task *);
86 static inline void do_xprt_reserve(struct rpc_task *);
87 static void xprt_disconnect(struct rpc_xprt *);
88 static void xprt_connect_status(struct rpc_task *task);
89 static struct socket *xprt_create_socket(int, struct rpc_timeout *, int);
90 static int xprt_bind_socket(struct rpc_xprt *, struct socket *);
91 static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
92
93 #ifdef RPC_DEBUG_DATA
94 /*
95 * Print the buffer contents (first 128 bytes only--just enough for
96 * diropres return).
97 */
98 static void
xprt_pktdump(char * msg,u32 * packet,unsigned int count)99 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
100 {
101 u8 *buf = (u8 *) packet;
102 int j;
103
104 dprintk("RPC: %s\n", msg);
105 for (j = 0; j < count && j < 128; j += 4) {
106 if (!(j & 31)) {
107 if (j)
108 dprintk("\n");
109 dprintk("0x%04x ", j);
110 }
111 dprintk("%02x%02x%02x%02x ",
112 buf[j], buf[j+1], buf[j+2], buf[j+3]);
113 }
114 dprintk("\n");
115 }
116 #else
117 static inline void
xprt_pktdump(char * msg,u32 * packet,unsigned int count)118 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
119 {
120 /* NOP */
121 }
122 #endif
123
124 /*
125 * Look up RPC transport given an INET socket
126 */
127 static inline struct rpc_xprt *
xprt_from_sock(struct sock * sk)128 xprt_from_sock(struct sock *sk)
129 {
130 return (struct rpc_xprt *) sk->user_data;
131 }
132
133 /*
134 * Serialize write access to sockets, in order to prevent different
135 * requests from interfering with each other.
136 * Also prevents TCP socket connections from colliding with writes.
137 */
138 static int
__xprt_lock_write(struct rpc_xprt * xprt,struct rpc_task * task)139 __xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
140 {
141 struct rpc_rqst *req = task->tk_rqstp;
142 if (!xprt->snd_task) {
143 if (xprt->nocong || __xprt_get_cong(xprt, task)) {
144 xprt->snd_task = task;
145 if (req) {
146 req->rq_bytes_sent = 0;
147 req->rq_ntrans++;
148 }
149 }
150 }
151 if (xprt->snd_task != task) {
152 dprintk("RPC: %4d TCP write queue full\n", task->tk_pid);
153 task->tk_timeout = 0;
154 task->tk_status = -EAGAIN;
155 if (req && req->rq_ntrans)
156 rpc_sleep_on(&xprt->resend, task, NULL, NULL);
157 else
158 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
159 }
160 return xprt->snd_task == task;
161 }
162
163 static inline int
xprt_lock_write(struct rpc_xprt * xprt,struct rpc_task * task)164 xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
165 {
166 int retval;
167 spin_lock_bh(&xprt->sock_lock);
168 retval = __xprt_lock_write(xprt, task);
169 spin_unlock_bh(&xprt->sock_lock);
170 return retval;
171 }
172
173 static void
__xprt_lock_write_next(struct rpc_xprt * xprt)174 __xprt_lock_write_next(struct rpc_xprt *xprt)
175 {
176 struct rpc_task *task;
177
178 if (xprt->snd_task)
179 return;
180 task = rpc_wake_up_next(&xprt->resend);
181 if (!task) {
182 if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
183 return;
184 task = rpc_wake_up_next(&xprt->sending);
185 if (!task)
186 return;
187 }
188 if (xprt->nocong || __xprt_get_cong(xprt, task)) {
189 struct rpc_rqst *req = task->tk_rqstp;
190 xprt->snd_task = task;
191 if (req) {
192 req->rq_bytes_sent = 0;
193 req->rq_ntrans++;
194 }
195 }
196 }
197
198 /*
199 * Releases the socket for use by other requests.
200 */
201 static void
__xprt_release_write(struct rpc_xprt * xprt,struct rpc_task * task)202 __xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
203 {
204 if (xprt->snd_task == task)
205 xprt->snd_task = NULL;
206 __xprt_lock_write_next(xprt);
207 }
208
209 static inline void
xprt_release_write(struct rpc_xprt * xprt,struct rpc_task * task)210 xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
211 {
212 spin_lock_bh(&xprt->sock_lock);
213 __xprt_release_write(xprt, task);
214 spin_unlock_bh(&xprt->sock_lock);
215 }
216
217 /*
218 * Write data to socket.
219 */
220 static inline int
xprt_sendmsg(struct rpc_xprt * xprt,struct rpc_rqst * req)221 xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
222 {
223 struct socket *sock = xprt->sock;
224 struct msghdr msg;
225 struct xdr_buf *xdr = &req->rq_snd_buf;
226 struct iovec niv[MAX_IOVEC];
227 unsigned int niov, slen, skip;
228 mm_segment_t oldfs;
229 int result;
230
231 if (!sock)
232 return -ENOTCONN;
233
234 xprt_pktdump("packet data:",
235 req->rq_svec->iov_base,
236 req->rq_svec->iov_len);
237
238 /* Dont repeat bytes */
239 skip = req->rq_bytes_sent;
240 slen = xdr->len - skip;
241 oldfs = get_fs(); set_fs(get_ds());
242 do {
243 unsigned int slen_part, n;
244
245 niov = xdr_kmap(niv, xdr, skip);
246 if (!niov) {
247 result = -EAGAIN;
248 break;
249 }
250
251 msg.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL;
252 msg.msg_iov = niv;
253 msg.msg_iovlen = niov;
254 msg.msg_name = (struct sockaddr *) &xprt->addr;
255 msg.msg_namelen = sizeof(xprt->addr);
256 msg.msg_control = NULL;
257 msg.msg_controllen = 0;
258
259 slen_part = 0;
260 for (n = 0; n < niov; n++)
261 slen_part += niv[n].iov_len;
262
263 clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
264 result = sock_sendmsg(sock, &msg, slen_part);
265
266 xdr_kunmap(xdr, skip, niov);
267
268 skip += slen_part;
269 slen -= slen_part;
270 } while (result >= 0 && slen);
271 set_fs(oldfs);
272
273 dprintk("RPC: xprt_sendmsg(%d) = %d\n", slen, result);
274
275 if (result >= 0)
276 return result;
277
278 switch (result) {
279 case -ECONNREFUSED:
280 /* When the server has died, an ICMP port unreachable message
281 * prompts ECONNREFUSED.
282 */
283 case -EAGAIN:
284 break;
285 case -ECONNRESET:
286 case -ENOTCONN:
287 case -EPIPE:
288 /* connection broken */
289 if (xprt->stream)
290 result = -ENOTCONN;
291 break;
292 default:
293 printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
294 }
295 return result;
296 }
297
298 /*
299 * Van Jacobson congestion avoidance. Check if the congestion window
300 * overflowed. Put the task to sleep if this is the case.
301 */
302 static int
__xprt_get_cong(struct rpc_xprt * xprt,struct rpc_task * task)303 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
304 {
305 struct rpc_rqst *req = task->tk_rqstp;
306
307 if (req->rq_cong)
308 return 1;
309 dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
310 task->tk_pid, xprt->cong, xprt->cwnd);
311 if (RPCXPRT_CONGESTED(xprt))
312 return 0;
313 req->rq_cong = 1;
314 xprt->cong += RPC_CWNDSCALE;
315 return 1;
316 }
317
318 /*
319 * Adjust the congestion window, and wake up the next task
320 * that has been sleeping due to congestion
321 */
322 static void
__xprt_put_cong(struct rpc_xprt * xprt,struct rpc_rqst * req)323 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
324 {
325 if (!req->rq_cong)
326 return;
327 req->rq_cong = 0;
328 xprt->cong -= RPC_CWNDSCALE;
329 __xprt_lock_write_next(xprt);
330 }
331
332 /*
333 * Adjust RPC congestion window
334 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
335 */
336 static void
xprt_adjust_cwnd(struct rpc_xprt * xprt,int result)337 xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
338 {
339 unsigned long cwnd;
340
341 cwnd = xprt->cwnd;
342 if (result >= 0 && cwnd <= xprt->cong) {
343 /* The (cwnd >> 1) term makes sure
344 * the result gets rounded properly. */
345 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
346 if (cwnd > RPC_MAXCWND)
347 cwnd = RPC_MAXCWND;
348 __xprt_lock_write_next(xprt);
349 } else if (result == -ETIMEDOUT) {
350 cwnd >>= 1;
351 if (cwnd < RPC_CWNDSCALE)
352 cwnd = RPC_CWNDSCALE;
353 }
354 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
355 xprt->cong, xprt->cwnd, cwnd);
356 xprt->cwnd = cwnd;
357 }
358
359 /*
360 * Adjust timeout values etc for next retransmit
361 */
362 int
xprt_adjust_timeout(struct rpc_timeout * to)363 xprt_adjust_timeout(struct rpc_timeout *to)
364 {
365 if (to->to_retries > 0) {
366 if (to->to_exponential)
367 to->to_current <<= 1;
368 else
369 to->to_current += to->to_increment;
370 if (to->to_maxval && to->to_current >= to->to_maxval)
371 to->to_current = to->to_maxval;
372 } else {
373 if (to->to_exponential)
374 to->to_initval <<= 1;
375 else
376 to->to_initval += to->to_increment;
377 if (to->to_maxval && to->to_initval >= to->to_maxval)
378 to->to_initval = to->to_maxval;
379 to->to_current = to->to_initval;
380 }
381
382 if (!to->to_current) {
383 printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
384 to->to_current = 5 * HZ;
385 }
386 pprintk("RPC: %lu %s\n", jiffies,
387 to->to_retries? "retrans" : "timeout");
388 return to->to_retries-- > 0;
389 }
390
391 /*
392 * Close down a transport socket
393 */
394 static void
xprt_close(struct rpc_xprt * xprt)395 xprt_close(struct rpc_xprt *xprt)
396 {
397 struct socket *sock = xprt->sock;
398 struct sock *sk = xprt->inet;
399
400 if (!sk)
401 return;
402
403 write_lock_bh(&sk->callback_lock);
404 xprt->inet = NULL;
405 xprt->sock = NULL;
406
407 sk->user_data = NULL;
408 sk->data_ready = xprt->old_data_ready;
409 sk->state_change = xprt->old_state_change;
410 sk->write_space = xprt->old_write_space;
411 write_unlock_bh(&sk->callback_lock);
412
413 xprt_disconnect(xprt);
414 sk->no_check = 0;
415
416 sock_release(sock);
417 }
418
419 /*
420 * Mark a transport as disconnected
421 */
422 static void
xprt_disconnect(struct rpc_xprt * xprt)423 xprt_disconnect(struct rpc_xprt *xprt)
424 {
425 dprintk("RPC: disconnected transport %p\n", xprt);
426 spin_lock_bh(&xprt->sock_lock);
427 xprt_clear_connected(xprt);
428 rpc_wake_up_status(&xprt->pending, -ENOTCONN);
429 spin_unlock_bh(&xprt->sock_lock);
430 }
431
432 /*
433 * Reconnect a broken TCP connection.
434 *
435 */
436 void
xprt_connect(struct rpc_task * task)437 xprt_connect(struct rpc_task *task)
438 {
439 struct rpc_xprt *xprt = task->tk_xprt;
440 struct socket *sock = xprt->sock;
441 struct sock *inet;
442 int status;
443
444 dprintk("RPC: %4d xprt_connect %p connected %d\n",
445 task->tk_pid, xprt, xprt_connected(xprt));
446 if (xprt->shutdown)
447 return;
448
449 if (!xprt->addr.sin_port) {
450 task->tk_status = -EIO;
451 return;
452 }
453
454 if (!xprt_lock_write(xprt, task))
455 return;
456 if (xprt_connected(xprt))
457 goto out_write;
458
459 if (task->tk_rqstp)
460 task->tk_rqstp->rq_bytes_sent = 0;
461
462 xprt_close(xprt);
463 /* Create an unconnected socket */
464 sock = xprt_create_socket(xprt->prot, &xprt->timeout, xprt->resvport);
465 if (!sock) {
466 /* couldn't create socket or bind to reserved port;
467 * this is likely a permanent error, so cause an abort */
468 task->tk_status = -EIO;
469 goto out_write;
470 }
471 xprt_bind_socket(xprt, sock);
472
473 if (!xprt->stream)
474 goto out_write;
475
476 inet = sock->sk;
477
478 /* Now connect it asynchronously. */
479 dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
480 status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
481 sizeof(xprt->addr), O_NONBLOCK);
482 dprintk("RPC: %4d connect status %d connected %d\n",
483 task->tk_pid, status, xprt_connected(xprt));
484
485 if (status >= 0)
486 return;
487
488 switch (status) {
489 case -EALREADY:
490 case -EINPROGRESS:
491 /* Protect against TCP socket state changes */
492 lock_sock(inet);
493 if (inet->state != TCP_ESTABLISHED) {
494 dprintk("RPC: %4d waiting for connection\n",
495 task->tk_pid);
496 task->tk_timeout = RPC_CONNECT_TIMEOUT;
497 /* if the socket is already closing, delay briefly */
498 if ((1<<inet->state) & ~(TCPF_SYN_SENT|TCPF_SYN_RECV))
499 task->tk_timeout = RPC_REESTABLISH_TIMEOUT;
500 rpc_sleep_on(&xprt->pending, task, xprt_connect_status,
501 NULL);
502 }
503 release_sock(inet);
504 break;
505 case -ECONNREFUSED:
506 case -ECONNRESET:
507 case -ENOTCONN:
508 if (!task->tk_client->cl_softrtry) {
509 rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
510 task->tk_status = -ENOTCONN;
511 break;
512 }
513 default:
514 /* Report myriad other possible returns. If this file
515 * system is soft mounted, just error out, like Solaris. */
516 if (task->tk_client->cl_softrtry) {
517 printk(KERN_WARNING
518 "RPC: error %d connecting to server %s, exiting\n",
519 -status, task->tk_client->cl_server);
520 task->tk_status = -EIO;
521 goto out_write;
522 }
523 printk(KERN_WARNING "RPC: error %d connecting to server %s\n",
524 -status, task->tk_client->cl_server);
525 /* This will prevent anybody else from connecting */
526 rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
527 task->tk_status = status;
528 break;
529 }
530 return;
531 out_write:
532 xprt_release_write(xprt, task);
533 }
534
535 /*
536 * We arrive here when awoken from waiting on connection establishment.
537 */
538 static void
xprt_connect_status(struct rpc_task * task)539 xprt_connect_status(struct rpc_task *task)
540 {
541 struct rpc_xprt *xprt = task->tk_xprt;
542
543 if (task->tk_status >= 0) {
544 dprintk("RPC: %4d xprt_connect_status: connection established\n",
545 task->tk_pid);
546 return;
547 }
548
549 /* if soft mounted, cause this RPC to fail */
550 if (task->tk_client->cl_softrtry)
551 task->tk_status = -EIO;
552
553 switch (task->tk_status) {
554 case -ENOTCONN:
555 rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
556 return;
557 case -ETIMEDOUT:
558 dprintk("RPC: %4d xprt_connect_status: timed out\n",
559 task->tk_pid);
560 break;
561 default:
562 printk(KERN_ERR "RPC: error %d connecting to server %s\n",
563 -task->tk_status, task->tk_client->cl_server);
564 }
565 xprt_release_write(xprt, task);
566 }
567
568 /*
569 * Look up the RPC request corresponding to a reply, and then lock it.
570 */
571 static inline struct rpc_rqst *
xprt_lookup_rqst(struct rpc_xprt * xprt,u32 xid)572 xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
573 {
574 struct list_head *pos;
575 struct rpc_rqst *req = NULL;
576
577 list_for_each(pos, &xprt->recv) {
578 struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
579 if (entry->rq_xid == xid) {
580 req = entry;
581 break;
582 }
583 }
584 return req;
585 }
586
587 /*
588 * Complete reply received.
589 * The TCP code relies on us to remove the request from xprt->pending.
590 */
591 static void
xprt_complete_rqst(struct rpc_xprt * xprt,struct rpc_rqst * req,int copied)592 xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
593 {
594 struct rpc_task *task = req->rq_task;
595 struct rpc_clnt *clnt = task->tk_client;
596
597 /* Adjust congestion window */
598 if (!xprt->nocong) {
599 int timer = rpcproc_timer(clnt, task->tk_msg.rpc_proc);
600 xprt_adjust_cwnd(xprt, copied);
601 __xprt_put_cong(xprt, req);
602 if (req->rq_ntrans == 1) {
603 if (timer)
604 rpc_update_rtt(&clnt->cl_rtt, timer, (long)jiffies - req->rq_xtime);
605 }
606 rpc_set_timeo(&clnt->cl_rtt, timer, req->rq_ntrans - 1);
607 }
608
609 #ifdef RPC_PROFILE
610 /* Profile only reads for now */
611 if (copied > 1024) {
612 static unsigned long nextstat = 0;
613 static unsigned long pkt_rtt = 0, pkt_len = 0, pkt_cnt = 0;
614
615 pkt_cnt++;
616 pkt_len += req->rq_slen + copied;
617 pkt_rtt += jiffies - req->rq_xtime;
618 if (time_before(nextstat, jiffies)) {
619 printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
620 printk("RPC: %ld %ld %ld %ld stat\n",
621 jiffies, pkt_cnt, pkt_len, pkt_rtt);
622 pkt_rtt = pkt_len = pkt_cnt = 0;
623 nextstat = jiffies + 5 * HZ;
624 }
625 }
626 #endif
627
628 dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
629 req->rq_received = copied;
630 list_del_init(&req->rq_list);
631
632 /* ... and wake up the process. */
633 rpc_wake_up_task(task);
634 return;
635 }
636
637 static size_t
skb_read_bits(skb_reader_t * desc,void * to,size_t len)638 skb_read_bits(skb_reader_t *desc, void *to, size_t len)
639 {
640 if (len > desc->count)
641 len = desc->count;
642 skb_copy_bits(desc->skb, desc->offset, to, len);
643 desc->count -= len;
644 desc->offset += len;
645 return len;
646 }
647
648 static size_t
skb_read_and_csum_bits(skb_reader_t * desc,void * to,size_t len)649 skb_read_and_csum_bits(skb_reader_t *desc, void *to, size_t len)
650 {
651 unsigned int csum2, pos;
652
653 if (len > desc->count)
654 len = desc->count;
655 pos = desc->offset;
656 csum2 = skb_copy_and_csum_bits(desc->skb, pos, to, len, 0);
657 desc->csum = csum_block_add(desc->csum, csum2, pos);
658 desc->count -= len;
659 desc->offset += len;
660 return len;
661 }
662
663 /*
664 * We have set things up such that we perform the checksum of the UDP
665 * packet in parallel with the copies into the RPC client iovec. -DaveM
666 */
667 static int
csum_partial_copy_to_xdr(struct xdr_buf * xdr,struct sk_buff * skb)668 csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
669 {
670 skb_reader_t desc;
671
672 desc.skb = skb;
673 desc.offset = sizeof(struct udphdr);
674 desc.count = skb->len - desc.offset;
675
676 if (skb->ip_summed == CHECKSUM_UNNECESSARY)
677 goto no_checksum;
678
679 desc.csum = csum_partial(skb->data, desc.offset, skb->csum);
680 xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_and_csum_bits);
681 if (desc.offset != skb->len) {
682 unsigned int csum2;
683 csum2 = skb_checksum(skb, desc.offset, skb->len - desc.offset, 0);
684 desc.csum = csum_block_add(desc.csum, csum2, desc.offset);
685 }
686 if ((unsigned short)csum_fold(desc.csum))
687 return -1;
688 return 0;
689 no_checksum:
690 xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_bits);
691 return 0;
692 }
693
694 /*
695 * Input handler for RPC replies. Called from a bottom half and hence
696 * atomic.
697 */
698 static void
udp_data_ready(struct sock * sk,int len)699 udp_data_ready(struct sock *sk, int len)
700 {
701 struct rpc_task *task;
702 struct rpc_xprt *xprt;
703 struct rpc_rqst *rovr;
704 struct sk_buff *skb;
705 int err, repsize, copied;
706
707 read_lock(&sk->callback_lock);
708 dprintk("RPC: udp_data_ready...\n");
709 if (sk->dead || !(xprt = xprt_from_sock(sk))) {
710 printk("RPC: udp_data_ready request not found!\n");
711 goto out;
712 }
713
714 dprintk("RPC: udp_data_ready client %p\n", xprt);
715
716 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
717 goto out;
718
719 if (xprt->shutdown)
720 goto dropit;
721
722 repsize = skb->len - sizeof(struct udphdr);
723 if (repsize < 4) {
724 printk("RPC: impossible RPC reply size %d!\n", repsize);
725 goto dropit;
726 }
727
728 /* Look up and lock the request corresponding to the given XID */
729 spin_lock(&xprt->sock_lock);
730 rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr)));
731 if (!rovr)
732 goto out_unlock;
733 task = rovr->rq_task;
734
735 dprintk("RPC: %4d received reply\n", task->tk_pid);
736 xprt_pktdump("packet data:",
737 (u32 *) (skb->h.raw+sizeof(struct udphdr)), repsize);
738
739 if ((copied = rovr->rq_private_buf.len) > repsize)
740 copied = repsize;
741
742 /* Suck it into the iovec, verify checksum if not done by hw. */
743 if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))
744 goto out_unlock;
745
746 /* Something worked... */
747 dst_confirm(skb->dst);
748
749 xprt_complete_rqst(xprt, rovr, copied);
750
751 out_unlock:
752 spin_unlock(&xprt->sock_lock);
753 dropit:
754 skb_free_datagram(sk, skb);
755 out:
756 if (sk->sleep && waitqueue_active(sk->sleep))
757 wake_up_interruptible(sk->sleep);
758 read_unlock(&sk->callback_lock);
759 }
760
761 /*
762 * Copy from an skb into memory and shrink the skb.
763 */
764 static inline size_t
tcp_copy_data(skb_reader_t * desc,void * p,size_t len)765 tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
766 {
767 if (len > desc->count)
768 len = desc->count;
769 skb_copy_bits(desc->skb, desc->offset, p, len);
770 desc->offset += len;
771 desc->count -= len;
772 return len;
773 }
774
775 /*
776 * TCP read fragment marker
777 */
778 static inline void
tcp_read_fraghdr(struct rpc_xprt * xprt,skb_reader_t * desc)779 tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
780 {
781 size_t len, used;
782 char *p;
783
784 p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
785 len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
786 used = tcp_copy_data(desc, p, len);
787 xprt->tcp_offset += used;
788 if (used != len)
789 return;
790 xprt->tcp_reclen = ntohl(xprt->tcp_recm);
791 if (xprt->tcp_reclen & 0x80000000)
792 xprt->tcp_flags |= XPRT_LAST_FRAG;
793 else
794 xprt->tcp_flags &= ~XPRT_LAST_FRAG;
795 xprt->tcp_reclen &= 0x7fffffff;
796 xprt->tcp_flags &= ~XPRT_COPY_RECM;
797 xprt->tcp_offset = 0;
798 /* Sanity check of the record length */
799 if (xprt->tcp_reclen < 4) {
800 printk(KERN_ERR "RPC: Invalid TCP record fragment length\n");
801 xprt_disconnect(xprt);
802 }
803 dprintk("RPC: reading TCP record fragment of length %d\n",
804 xprt->tcp_reclen);
805 }
806
807 static void
tcp_check_recm(struct rpc_xprt * xprt)808 tcp_check_recm(struct rpc_xprt *xprt)
809 {
810 if (xprt->tcp_offset == xprt->tcp_reclen) {
811 xprt->tcp_flags |= XPRT_COPY_RECM;
812 xprt->tcp_offset = 0;
813 if (xprt->tcp_flags & XPRT_LAST_FRAG) {
814 xprt->tcp_flags &= ~XPRT_COPY_DATA;
815 xprt->tcp_flags |= XPRT_COPY_XID;
816 xprt->tcp_copied = 0;
817 }
818 }
819 }
820
821 /*
822 * TCP read xid
823 */
824 static inline void
tcp_read_xid(struct rpc_xprt * xprt,skb_reader_t * desc)825 tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
826 {
827 size_t len, used;
828 char *p;
829
830 len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
831 dprintk("RPC: reading XID (%Zu bytes)\n", len);
832 p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
833 used = tcp_copy_data(desc, p, len);
834 xprt->tcp_offset += used;
835 if (used != len)
836 return;
837 xprt->tcp_flags &= ~XPRT_COPY_XID;
838 xprt->tcp_flags |= XPRT_COPY_DATA;
839 xprt->tcp_copied = 4;
840 dprintk("RPC: reading reply for XID %08x\n", xprt->tcp_xid);
841 tcp_check_recm(xprt);
842 }
843
844 /*
845 * TCP read and complete request
846 */
847 static inline void
tcp_read_request(struct rpc_xprt * xprt,skb_reader_t * desc)848 tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
849 {
850 struct rpc_rqst *req;
851 struct xdr_buf *rcvbuf;
852 size_t len;
853
854 /* Find and lock the request corresponding to this xid */
855 spin_lock(&xprt->sock_lock);
856 req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
857 if (!req) {
858 xprt->tcp_flags &= ~XPRT_COPY_DATA;
859 dprintk("RPC: XID %08x request not found!\n",
860 xprt->tcp_xid);
861 spin_unlock(&xprt->sock_lock);
862 return;
863 }
864
865 rcvbuf = &req->rq_private_buf;
866 len = desc->count;
867 if (len > xprt->tcp_reclen - xprt->tcp_offset) {
868 skb_reader_t my_desc;
869
870 len = xprt->tcp_reclen - xprt->tcp_offset;
871 memcpy(&my_desc, desc, sizeof(my_desc));
872 my_desc.count = len;
873 xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
874 &my_desc, tcp_copy_data);
875 desc->count -= len;
876 desc->offset += len;
877 } else
878 xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
879 desc, tcp_copy_data);
880 xprt->tcp_copied += len;
881 xprt->tcp_offset += len;
882
883 if (xprt->tcp_copied == req->rq_private_buf.len)
884 xprt->tcp_flags &= ~XPRT_COPY_DATA;
885 else if (xprt->tcp_offset == xprt->tcp_reclen) {
886 if (xprt->tcp_flags & XPRT_LAST_FRAG)
887 xprt->tcp_flags &= ~XPRT_COPY_DATA;
888 }
889
890 if (!(xprt->tcp_flags & XPRT_COPY_DATA)) {
891 dprintk("RPC: %4d received reply complete\n",
892 req->rq_task->tk_pid);
893 xprt_complete_rqst(xprt, req, xprt->tcp_copied);
894 }
895 spin_unlock(&xprt->sock_lock);
896 tcp_check_recm(xprt);
897 }
898
899 /*
900 * TCP discard extra bytes from a short read
901 */
902 static inline void
tcp_read_discard(struct rpc_xprt * xprt,skb_reader_t * desc)903 tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
904 {
905 size_t len;
906
907 len = xprt->tcp_reclen - xprt->tcp_offset;
908 if (len > desc->count)
909 len = desc->count;
910 desc->count -= len;
911 desc->offset += len;
912 xprt->tcp_offset += len;
913 tcp_check_recm(xprt);
914 }
915
916 /*
917 * TCP record receive routine
918 * We first have to grab the record marker, then the XID, then the data.
919 */
920 static int
tcp_data_recv(read_descriptor_t * rd_desc,struct sk_buff * skb,unsigned int offset,size_t len)921 tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
922 unsigned int offset, size_t len)
923 {
924 struct rpc_xprt *xprt = (struct rpc_xprt *)rd_desc->buf;
925 skb_reader_t desc = { skb, offset, len };
926
927 dprintk("RPC: tcp_data_recv\n");
928 do {
929 /* Read in a new fragment marker if necessary */
930 /* Can we ever really expect to get completely empty fragments? */
931 if (xprt->tcp_flags & XPRT_COPY_RECM) {
932 tcp_read_fraghdr(xprt, &desc);
933 continue;
934 }
935 /* Read in the xid if necessary */
936 if (xprt->tcp_flags & XPRT_COPY_XID) {
937 tcp_read_xid(xprt, &desc);
938 continue;
939 }
940 /* Read in the request data */
941 if (xprt->tcp_flags & XPRT_COPY_DATA) {
942 tcp_read_request(xprt, &desc);
943 continue;
944 }
945 /* Skip over any trailing bytes on short reads */
946 tcp_read_discard(xprt, &desc);
947 } while (desc.count);
948 dprintk("RPC: tcp_data_recv done\n");
949 return len - desc.count;
950 }
951
tcp_data_ready(struct sock * sk,int bytes)952 static void tcp_data_ready(struct sock *sk, int bytes)
953 {
954 struct rpc_xprt *xprt;
955 read_descriptor_t rd_desc;
956
957 read_lock(&sk->callback_lock);
958 dprintk("RPC: tcp_data_ready...\n");
959 if (!(xprt = xprt_from_sock(sk))) {
960 printk("RPC: tcp_data_ready socket info not found!\n");
961 goto out;
962 }
963 if (xprt->shutdown)
964 goto out;
965
966 /* We use rd_desc to pass struct xprt to tcp_data_recv */
967 rd_desc.buf = (char *)xprt;
968 rd_desc.count = 65536;
969 tcp_read_sock(sk, &rd_desc, tcp_data_recv);
970 out:
971 read_unlock(&sk->callback_lock);
972 }
973
974 static void
tcp_state_change(struct sock * sk)975 tcp_state_change(struct sock *sk)
976 {
977 struct rpc_xprt *xprt;
978
979 read_lock(&sk->callback_lock);
980 if (!(xprt = xprt_from_sock(sk)))
981 goto out;
982 dprintk("RPC: tcp_state_change client %p...\n", xprt);
983 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
984 sk->state, xprt_connected(xprt),
985 sk->dead, sk->zapped);
986
987 switch (sk->state) {
988 case TCP_ESTABLISHED:
989 if (xprt_test_and_set_connected(xprt))
990 break;
991
992 /* Reset TCP record info */
993 xprt->tcp_offset = 0;
994 xprt->tcp_reclen = 0;
995 xprt->tcp_copied = 0;
996 xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
997
998 spin_lock_bh(&xprt->sock_lock);
999 if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
1000 rpc_wake_up_task(xprt->snd_task);
1001 spin_unlock_bh(&xprt->sock_lock);
1002 break;
1003 case TCP_SYN_SENT:
1004 case TCP_SYN_RECV:
1005 break;
1006 default:
1007 xprt_disconnect(xprt);
1008 break;
1009 }
1010 out:
1011 if (sk->sleep && waitqueue_active(sk->sleep))
1012 wake_up_interruptible_all(sk->sleep);
1013 read_unlock(&sk->callback_lock);
1014 }
1015
1016 /*
1017 * Called when more output buffer space is available for this socket.
1018 * We try not to wake our writers until they can make "significant"
1019 * progress, otherwise we'll waste resources thrashing sock_sendmsg
1020 * with a bunch of small requests.
1021 */
1022 static void
xprt_write_space(struct sock * sk)1023 xprt_write_space(struct sock *sk)
1024 {
1025 struct rpc_xprt *xprt;
1026 struct socket *sock;
1027
1028 read_lock(&sk->callback_lock);
1029 if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->socket))
1030 goto out;
1031 if (xprt->shutdown)
1032 goto out;
1033
1034 /* Wait until we have enough socket memory */
1035 if (xprt->stream) {
1036 /* from net/ipv4/tcp.c:tcp_write_space */
1037 if (tcp_wspace(sk) < tcp_min_write_space(sk))
1038 goto out;
1039 } else {
1040 /* from net/core/sock.c:sock_def_write_space */
1041 if (!sock_writeable(sk))
1042 goto out;
1043 }
1044
1045 if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags))
1046 goto out;
1047
1048 spin_lock_bh(&xprt->sock_lock);
1049 if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
1050 rpc_wake_up_task(xprt->snd_task);
1051 spin_unlock_bh(&xprt->sock_lock);
1052 if (sk->sleep && waitqueue_active(sk->sleep))
1053 wake_up_interruptible(sk->sleep);
1054 out:
1055 read_unlock(&sk->callback_lock);
1056 }
1057
1058 /*
1059 * RPC receive timeout handler.
1060 */
1061 static void
xprt_timer(struct rpc_task * task)1062 xprt_timer(struct rpc_task *task)
1063 {
1064 struct rpc_rqst *req = task->tk_rqstp;
1065 struct rpc_xprt *xprt = req->rq_xprt;
1066
1067 spin_lock(&xprt->sock_lock);
1068 if (req->rq_received)
1069 goto out;
1070
1071 xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT);
1072 __xprt_put_cong(xprt, req);
1073
1074 dprintk("RPC: %4d xprt_timer (%s request)\n",
1075 task->tk_pid, req ? "pending" : "backlogged");
1076
1077 task->tk_status = -ETIMEDOUT;
1078 out:
1079 task->tk_timeout = 0;
1080 rpc_wake_up_task(task);
1081 spin_unlock(&xprt->sock_lock);
1082 }
1083
1084 /*
1085 * Place the actual RPC call.
1086 * We have to copy the iovec because sendmsg fiddles with its contents.
1087 */
1088 void
xprt_transmit(struct rpc_task * task)1089 xprt_transmit(struct rpc_task *task)
1090 {
1091 struct rpc_rqst *req = task->tk_rqstp;
1092 struct rpc_xprt *xprt = req->rq_xprt;
1093
1094 dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid,
1095 *(u32 *)(req->rq_svec[0].iov_base));
1096
1097 if (xprt->shutdown)
1098 task->tk_status = -EIO;
1099
1100 if (task->tk_status < 0)
1101 return;
1102
1103 if (task->tk_rpcwait)
1104 rpc_remove_wait_queue(task);
1105
1106 /* set up everything as needed. */
1107 /* Write the record marker */
1108 if (xprt->stream) {
1109 u32 *marker = req->rq_svec[0].iov_base;
1110
1111 *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
1112 }
1113
1114 spin_lock_bh(&xprt->sock_lock);
1115 if (req->rq_received != 0 && !req->rq_bytes_sent)
1116 goto out_notrans;
1117
1118 if (!__xprt_lock_write(xprt, task))
1119 goto out_notrans;
1120
1121 if (!xprt_connected(xprt)) {
1122 task->tk_status = -ENOTCONN;
1123 goto out_notrans;
1124 }
1125
1126 if (list_empty(&req->rq_list)) {
1127 /* Update the softirq receive buffer */
1128 memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
1129 sizeof(req->rq_private_buf));
1130 list_add_tail(&req->rq_list, &xprt->recv);
1131 }
1132 spin_unlock_bh(&xprt->sock_lock);
1133
1134 do_xprt_transmit(task);
1135 return;
1136 out_notrans:
1137 spin_unlock_bh(&xprt->sock_lock);
1138 }
1139
1140 static void
do_xprt_transmit(struct rpc_task * task)1141 do_xprt_transmit(struct rpc_task *task)
1142 {
1143 struct rpc_clnt *clnt = task->tk_client;
1144 struct rpc_rqst *req = task->tk_rqstp;
1145 struct rpc_xprt *xprt = req->rq_xprt;
1146 int status, retry = 0;
1147
1148
1149 /* Continue transmitting the packet/record. We must be careful
1150 * to cope with writespace callbacks arriving _after_ we have
1151 * called xprt_sendmsg().
1152 */
1153 while (1) {
1154 req->rq_xtime = jiffies;
1155 status = xprt_sendmsg(xprt, req);
1156
1157 if (status < 0)
1158 break;
1159
1160 if (xprt->stream) {
1161 req->rq_bytes_sent += status;
1162
1163 /* If we've sent the entire packet, immediately
1164 * reset the count of bytes sent. */
1165 if (req->rq_bytes_sent >= req->rq_slen) {
1166 req->rq_bytes_sent = 0;
1167 goto out_receive;
1168 }
1169 } else {
1170 if (status >= req->rq_slen)
1171 goto out_receive;
1172 status = -EAGAIN;
1173 break;
1174 }
1175
1176 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1177 task->tk_pid, req->rq_slen - req->rq_bytes_sent,
1178 req->rq_slen);
1179
1180 status = -EAGAIN;
1181 if (retry++ > 50)
1182 break;
1183 }
1184
1185 /* If we're doing a resend and have received a reply already,
1186 * then exit early.
1187 * Note, though, that we can't do this if we've already started
1188 * resending down a TCP stream.
1189 */
1190 task->tk_status = status;
1191
1192 switch (status) {
1193 case -EAGAIN:
1194 if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
1195 /* Protect against races with xprt_write_space */
1196 spin_lock_bh(&xprt->sock_lock);
1197 /* Don't race with disconnect */
1198 if (!xprt_connected(xprt))
1199 task->tk_status = -ENOTCONN;
1200 else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) {
1201 task->tk_timeout = req->rq_timeout.to_current;
1202 rpc_sleep_on(&xprt->pending, task, NULL, NULL);
1203 }
1204 spin_unlock_bh(&xprt->sock_lock);
1205 return;
1206 }
1207 /* Keep holding the socket if it is blocked */
1208 rpc_delay(task, HZ>>4);
1209 return;
1210 case -ECONNREFUSED:
1211 task->tk_timeout = RPC_REESTABLISH_TIMEOUT;
1212 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
1213 case -ENOTCONN:
1214 return;
1215 default:
1216 if (xprt->stream)
1217 xprt_disconnect(xprt);
1218 }
1219 xprt_release_write(xprt, task);
1220 return;
1221 out_receive:
1222 dprintk("RPC: %4d xmit complete\n", task->tk_pid);
1223 spin_lock_bh(&xprt->sock_lock);
1224 /* Set the task's receive timeout value */
1225 if (!xprt->nocong) {
1226 int timer = rpcproc_timer(clnt, task->tk_msg.rpc_proc);
1227 task->tk_timeout = rpc_calc_rto(&clnt->cl_rtt, timer);
1228 task->tk_timeout <<= rpc_ntimeo(&clnt->cl_rtt, timer);
1229 task->tk_timeout <<= clnt->cl_timeout.to_retries
1230 - req->rq_timeout.to_retries;
1231 if (task->tk_timeout > req->rq_timeout.to_maxval)
1232 task->tk_timeout = req->rq_timeout.to_maxval;
1233 } else
1234 task->tk_timeout = req->rq_timeout.to_current;
1235 /* Don't race with disconnect */
1236 if (!xprt_connected(xprt))
1237 task->tk_status = -ENOTCONN;
1238 else if (!req->rq_received)
1239 rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
1240 __xprt_release_write(xprt, task);
1241 spin_unlock_bh(&xprt->sock_lock);
1242 }
1243
1244 /*
1245 * Reserve an RPC call slot.
1246 */
1247 static inline void
do_xprt_reserve(struct rpc_task * task)1248 do_xprt_reserve(struct rpc_task *task)
1249 {
1250 struct rpc_xprt *xprt = task->tk_xprt;
1251
1252 task->tk_status = 0;
1253 if (task->tk_rqstp)
1254 return;
1255 if (xprt->free) {
1256 struct rpc_rqst *req = xprt->free;
1257 xprt->free = req->rq_next;
1258 req->rq_next = NULL;
1259 task->tk_rqstp = req;
1260 xprt_request_init(task, xprt);
1261 return;
1262 }
1263 dprintk("RPC: waiting for request slot\n");
1264 task->tk_status = -EAGAIN;
1265 task->tk_timeout = 0;
1266 rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
1267 }
1268
1269 void
xprt_reserve(struct rpc_task * task)1270 xprt_reserve(struct rpc_task *task)
1271 {
1272 struct rpc_xprt *xprt = task->tk_xprt;
1273
1274 task->tk_status = -EIO;
1275 if (!xprt->shutdown) {
1276 spin_lock(&xprt->xprt_lock);
1277 do_xprt_reserve(task);
1278 spin_unlock(&xprt->xprt_lock);
1279 }
1280 }
1281
1282 /*
1283 * Allocate a 'unique' XID
1284 */
1285 static u32
xprt_alloc_xid(void)1286 xprt_alloc_xid(void)
1287 {
1288 static spinlock_t xid_lock = SPIN_LOCK_UNLOCKED;
1289 static int need_init = 1;
1290 static u32 xid;
1291 u32 ret;
1292
1293 spin_lock(&xid_lock);
1294 if (unlikely(need_init)) {
1295 xid = CURRENT_TIME << 12;
1296 need_init = 0;
1297 }
1298 ret = xid++;
1299 spin_unlock(&xid_lock);
1300 return ret;
1301 }
1302
1303 /*
1304 * Initialize RPC request
1305 */
1306 static void
xprt_request_init(struct rpc_task * task,struct rpc_xprt * xprt)1307 xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
1308 {
1309 struct rpc_rqst *req = task->tk_rqstp;
1310
1311 req->rq_timeout = xprt->timeout;
1312 req->rq_task = task;
1313 req->rq_xprt = xprt;
1314 req->rq_xid = xprt_alloc_xid();
1315 INIT_LIST_HEAD(&req->rq_list);
1316 dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
1317 req, req->rq_xid);
1318 }
1319
1320 /*
1321 * Release an RPC call slot
1322 */
1323 void
xprt_release(struct rpc_task * task)1324 xprt_release(struct rpc_task *task)
1325 {
1326 struct rpc_xprt *xprt = task->tk_xprt;
1327 struct rpc_rqst *req;
1328
1329 if (!(req = task->tk_rqstp))
1330 return;
1331 spin_lock_bh(&xprt->sock_lock);
1332 __xprt_release_write(xprt, task);
1333 __xprt_put_cong(xprt, req);
1334 if (!list_empty(&req->rq_list))
1335 list_del(&req->rq_list);
1336 spin_unlock_bh(&xprt->sock_lock);
1337 task->tk_rqstp = NULL;
1338 memset(req, 0, sizeof(*req)); /* mark unused */
1339
1340 dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
1341
1342 spin_lock(&xprt->xprt_lock);
1343 req->rq_next = xprt->free;
1344 xprt->free = req;
1345
1346 xprt_clear_backlog(xprt);
1347 spin_unlock(&xprt->xprt_lock);
1348 }
1349
1350 /*
1351 * Set default timeout parameters
1352 */
1353 void
xprt_default_timeout(struct rpc_timeout * to,int proto)1354 xprt_default_timeout(struct rpc_timeout *to, int proto)
1355 {
1356 if (proto == IPPROTO_UDP)
1357 xprt_set_timeout(to, 5, 5 * HZ);
1358 else
1359 xprt_set_timeout(to, 5, 60 * HZ);
1360 }
1361
1362 /*
1363 * Set constant timeout
1364 */
1365 void
xprt_set_timeout(struct rpc_timeout * to,unsigned int retr,unsigned long incr)1366 xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
1367 {
1368 to->to_current =
1369 to->to_initval =
1370 to->to_increment = incr;
1371 to->to_maxval = incr * retr;
1372 to->to_retries = retr;
1373 to->to_exponential = 0;
1374 }
1375
1376 /*
1377 * Initialize an RPC client
1378 */
1379 static struct rpc_xprt *
xprt_setup(int proto,struct sockaddr_in * ap,struct rpc_timeout * to)1380 xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
1381 {
1382 struct rpc_xprt *xprt;
1383 struct rpc_rqst *req;
1384 int i;
1385
1386 dprintk("RPC: setting up %s transport...\n",
1387 proto == IPPROTO_UDP? "UDP" : "TCP");
1388
1389 if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
1390 return NULL;
1391 memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
1392
1393 xprt->addr = *ap;
1394 xprt->prot = proto;
1395 xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
1396 if (xprt->stream) {
1397 xprt->cwnd = RPC_MAXCWND;
1398 xprt->nocong = 1;
1399 } else
1400 xprt->cwnd = RPC_INITCWND;
1401 spin_lock_init(&xprt->sock_lock);
1402 spin_lock_init(&xprt->xprt_lock);
1403 init_waitqueue_head(&xprt->cong_wait);
1404
1405 INIT_LIST_HEAD(&xprt->recv);
1406
1407 /* Set timeout parameters */
1408 if (to) {
1409 xprt->timeout = *to;
1410 xprt->timeout.to_current = to->to_initval;
1411 } else
1412 xprt_default_timeout(&xprt->timeout, xprt->prot);
1413
1414 INIT_RPC_WAITQ(&xprt->pending, "xprt_pending");
1415 INIT_RPC_WAITQ(&xprt->sending, "xprt_sending");
1416 INIT_RPC_WAITQ(&xprt->resend, "xprt_resend");
1417 INIT_RPC_WAITQ(&xprt->backlog, "xprt_backlog");
1418
1419 /* initialize free list */
1420 for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
1421 req->rq_next = req + 1;
1422 req->rq_next = NULL;
1423 xprt->free = xprt->slot;
1424
1425 /* Check whether we want to use a reserved port */
1426 xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
1427
1428 dprintk("RPC: created transport %p\n", xprt);
1429
1430 return xprt;
1431 }
1432
1433 /*
1434 * Bind to a reserved port
1435 */
1436 static inline int
xprt_bindresvport(struct socket * sock)1437 xprt_bindresvport(struct socket *sock)
1438 {
1439 struct sockaddr_in myaddr;
1440 int err, port;
1441 kernel_cap_t saved_cap = current->cap_effective;
1442
1443 /* Override capabilities.
1444 * They were checked in xprt_create_proto i.e. at mount time
1445 */
1446 cap_raise (current->cap_effective, CAP_NET_BIND_SERVICE);
1447
1448 memset(&myaddr, 0, sizeof(myaddr));
1449 myaddr.sin_family = AF_INET;
1450 port = 800;
1451 do {
1452 myaddr.sin_port = htons(port);
1453 err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
1454 sizeof(myaddr));
1455 } while (err == -EADDRINUSE && --port > 0);
1456 current->cap_effective = saved_cap;
1457
1458 if (err < 0)
1459 printk("RPC: Can't bind to reserved port (%d).\n", -err);
1460
1461 return err;
1462 }
1463
1464 static int
xprt_bind_socket(struct rpc_xprt * xprt,struct socket * sock)1465 xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
1466 {
1467 struct sock *sk = sock->sk;
1468
1469 if (xprt->inet)
1470 return -EBUSY;
1471
1472 write_lock_bh(&sk->callback_lock);
1473 sk->user_data = xprt;
1474 xprt->old_data_ready = sk->data_ready;
1475 xprt->old_state_change = sk->state_change;
1476 xprt->old_write_space = sk->write_space;
1477 if (xprt->prot == IPPROTO_UDP) {
1478 sk->data_ready = udp_data_ready;
1479 sk->no_check = UDP_CSUM_NORCV;
1480 xprt_set_connected(xprt);
1481 } else {
1482 struct tcp_opt *tp = &(sk->tp_pinfo.af_tcp);
1483 tp->nonagle = 1; /* disable Nagle's algorithm */
1484 sk->data_ready = tcp_data_ready;
1485 sk->state_change = tcp_state_change;
1486 xprt_clear_connected(xprt);
1487 }
1488 sk->write_space = xprt_write_space;
1489
1490 /* Reset to new socket */
1491 xprt->sock = sock;
1492 xprt->inet = sk;
1493 write_unlock_bh(&sk->callback_lock);
1494
1495 return 0;
1496 }
1497
1498 /*
1499 * Set socket buffer length
1500 */
1501 void
xprt_sock_setbufsize(struct rpc_xprt * xprt)1502 xprt_sock_setbufsize(struct rpc_xprt *xprt)
1503 {
1504 struct sock *sk = xprt->inet;
1505
1506 if (xprt->stream)
1507 return;
1508 if (xprt->rcvsize) {
1509 sk->userlocks |= SOCK_RCVBUF_LOCK;
1510 sk->rcvbuf = xprt->rcvsize * RPC_MAXCONG * 2;
1511 }
1512 if (xprt->sndsize) {
1513 sk->userlocks |= SOCK_SNDBUF_LOCK;
1514 sk->sndbuf = xprt->sndsize * RPC_MAXCONG * 2;
1515 sk->write_space(sk);
1516 }
1517 }
1518
1519 /*
1520 * Create a client socket given the protocol and peer address.
1521 */
1522 static struct socket *
xprt_create_socket(int proto,struct rpc_timeout * to,int resvport)1523 xprt_create_socket(int proto, struct rpc_timeout *to, int resvport)
1524 {
1525 struct socket *sock;
1526 int type, err;
1527
1528 dprintk("RPC: xprt_create_socket(%s %d)\n",
1529 (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
1530
1531 type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
1532
1533 if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
1534 printk("RPC: can't create socket (%d).\n", -err);
1535 goto failed;
1536 }
1537
1538 /* bind to a reserved port */
1539 if (resvport && xprt_bindresvport(sock) < 0)
1540 goto failed;
1541
1542 return sock;
1543
1544 failed:
1545 sock_release(sock);
1546 return NULL;
1547 }
1548
1549 /*
1550 * Create an RPC client transport given the protocol and peer address.
1551 */
1552 struct rpc_xprt *
xprt_create_proto(int proto,struct sockaddr_in * sap,struct rpc_timeout * to)1553 xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1554 {
1555 struct rpc_xprt *xprt;
1556
1557 xprt = xprt_setup(proto, sap, to);
1558 if (!xprt)
1559 goto out_bad;
1560
1561 dprintk("RPC: xprt_create_proto created xprt %p\n", xprt);
1562 return xprt;
1563 out_bad:
1564 dprintk("RPC: xprt_create_proto failed\n");
1565 if (xprt)
1566 kfree(xprt);
1567 return NULL;
1568 }
1569
1570 /*
1571 * Prepare for transport shutdown.
1572 */
1573 void
xprt_shutdown(struct rpc_xprt * xprt)1574 xprt_shutdown(struct rpc_xprt *xprt)
1575 {
1576 xprt->shutdown = 1;
1577 rpc_wake_up(&xprt->sending);
1578 rpc_wake_up(&xprt->resend);
1579 rpc_wake_up(&xprt->pending);
1580 rpc_wake_up(&xprt->backlog);
1581 if (waitqueue_active(&xprt->cong_wait))
1582 wake_up(&xprt->cong_wait);
1583 }
1584
1585 /*
1586 * Clear the xprt backlog queue
1587 */
1588 int
xprt_clear_backlog(struct rpc_xprt * xprt)1589 xprt_clear_backlog(struct rpc_xprt *xprt) {
1590 rpc_wake_up_next(&xprt->backlog);
1591 if (waitqueue_active(&xprt->cong_wait))
1592 wake_up(&xprt->cong_wait);
1593 return 1;
1594 }
1595
1596 /*
1597 * Destroy an RPC transport, killing off all requests.
1598 */
1599 int
xprt_destroy(struct rpc_xprt * xprt)1600 xprt_destroy(struct rpc_xprt *xprt)
1601 {
1602 dprintk("RPC: destroying transport %p\n", xprt);
1603 xprt_shutdown(xprt);
1604 xprt_close(xprt);
1605 kfree(xprt);
1606
1607 return 0;
1608 }
1609