1 /*
2  * Copyright (c) 2006 Oracle.  All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  *
32  */
33 #include <linux/kernel.h>
34 #include <linux/gfp.h>
35 #include <linux/in.h>
36 #include <net/tcp.h>
37 
38 #include "rds.h"
39 #include "tcp.h"
40 
41 /*
42  * cheesy, but simple..
43  */
44 static void rds_tcp_accept_worker(struct work_struct *work);
45 static DECLARE_WORK(rds_tcp_listen_work, rds_tcp_accept_worker);
46 static struct socket *rds_tcp_listen_sock;
47 
rds_tcp_accept_one(struct socket * sock)48 static int rds_tcp_accept_one(struct socket *sock)
49 {
50 	struct socket *new_sock = NULL;
51 	struct rds_connection *conn;
52 	int ret;
53 	struct inet_sock *inet;
54 
55 	ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type,
56 			       sock->sk->sk_protocol, &new_sock);
57 	if (ret)
58 		goto out;
59 
60 	new_sock->type = sock->type;
61 	new_sock->ops = sock->ops;
62 	ret = sock->ops->accept(sock, new_sock, O_NONBLOCK);
63 	if (ret < 0)
64 		goto out;
65 
66 	rds_tcp_tune(new_sock);
67 
68 	inet = inet_sk(new_sock->sk);
69 
70 	rdsdebug("accepted tcp %pI4:%u -> %pI4:%u\n",
71 		 &inet->inet_saddr, ntohs(inet->inet_sport),
72 		 &inet->inet_daddr, ntohs(inet->inet_dport));
73 
74 	conn = rds_conn_create(inet->inet_saddr, inet->inet_daddr,
75 			       &rds_tcp_transport, GFP_KERNEL);
76 	if (IS_ERR(conn)) {
77 		ret = PTR_ERR(conn);
78 		goto out;
79 	}
80 
81 	/*
82 	 * see the comment above rds_queue_delayed_reconnect()
83 	 */
84 	if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
85 		if (rds_conn_state(conn) == RDS_CONN_UP)
86 			rds_tcp_stats_inc(s_tcp_listen_closed_stale);
87 		else
88 			rds_tcp_stats_inc(s_tcp_connect_raced);
89 		rds_conn_drop(conn);
90 		ret = 0;
91 		goto out;
92 	}
93 
94 	rds_tcp_set_callbacks(new_sock, conn);
95 	rds_connect_complete(conn);
96 	new_sock = NULL;
97 	ret = 0;
98 
99 out:
100 	if (new_sock)
101 		sock_release(new_sock);
102 	return ret;
103 }
104 
rds_tcp_accept_worker(struct work_struct * work)105 static void rds_tcp_accept_worker(struct work_struct *work)
106 {
107 	while (rds_tcp_accept_one(rds_tcp_listen_sock) == 0)
108 		cond_resched();
109 }
110 
rds_tcp_listen_data_ready(struct sock * sk,int bytes)111 void rds_tcp_listen_data_ready(struct sock *sk, int bytes)
112 {
113 	void (*ready)(struct sock *sk, int bytes);
114 
115 	rdsdebug("listen data ready sk %p\n", sk);
116 
117 	read_lock_bh(&sk->sk_callback_lock);
118 	ready = sk->sk_user_data;
119 	if (!ready) { /* check for teardown race */
120 		ready = sk->sk_data_ready;
121 		goto out;
122 	}
123 
124 	/*
125 	 * ->sk_data_ready is also called for a newly established child socket
126 	 * before it has been accepted and the accepter has set up their
127 	 * data_ready.. we only want to queue listen work for our listening
128 	 * socket
129 	 */
130 	if (sk->sk_state == TCP_LISTEN)
131 		queue_work(rds_wq, &rds_tcp_listen_work);
132 
133 out:
134 	read_unlock_bh(&sk->sk_callback_lock);
135 	ready(sk, bytes);
136 }
137 
rds_tcp_listen_init(void)138 int rds_tcp_listen_init(void)
139 {
140 	struct sockaddr_in sin;
141 	struct socket *sock = NULL;
142 	int ret;
143 
144 	ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
145 	if (ret < 0)
146 		goto out;
147 
148 	sock->sk->sk_reuse = 1;
149 	rds_tcp_nonagle(sock);
150 
151 	write_lock_bh(&sock->sk->sk_callback_lock);
152 	sock->sk->sk_user_data = sock->sk->sk_data_ready;
153 	sock->sk->sk_data_ready = rds_tcp_listen_data_ready;
154 	write_unlock_bh(&sock->sk->sk_callback_lock);
155 
156 	sin.sin_family = PF_INET,
157 	sin.sin_addr.s_addr = (__force u32)htonl(INADDR_ANY);
158 	sin.sin_port = (__force u16)htons(RDS_TCP_PORT);
159 
160 	ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
161 	if (ret < 0)
162 		goto out;
163 
164 	ret = sock->ops->listen(sock, 64);
165 	if (ret < 0)
166 		goto out;
167 
168 	rds_tcp_listen_sock = sock;
169 	sock = NULL;
170 out:
171 	if (sock)
172 		sock_release(sock);
173 	return ret;
174 }
175 
rds_tcp_listen_stop(void)176 void rds_tcp_listen_stop(void)
177 {
178 	struct socket *sock = rds_tcp_listen_sock;
179 	struct sock *sk;
180 
181 	if (!sock)
182 		return;
183 
184 	sk = sock->sk;
185 
186 	/* serialize with and prevent further callbacks */
187 	lock_sock(sk);
188 	write_lock_bh(&sk->sk_callback_lock);
189 	if (sk->sk_user_data) {
190 		sk->sk_data_ready = sk->sk_user_data;
191 		sk->sk_user_data = NULL;
192 	}
193 	write_unlock_bh(&sk->sk_callback_lock);
194 	release_sock(sk);
195 
196 	/* wait for accepts to stop and close the socket */
197 	flush_workqueue(rds_wq);
198 	sock_release(sock);
199 	rds_tcp_listen_sock = NULL;
200 }
201