1 #![allow(clippy::collapsible_if)]
2
3 mod utils;
4
5 use std::cmp;
6 use std::io::{Read, Write};
7 use std::net::TcpStream;
8 use std::os::unix::io::AsRawFd;
9 use std::sync::atomic::{AtomicBool, Ordering};
10 use std::thread;
11
12 use smoltcp::iface::{Config, Interface, SocketSet};
13 use smoltcp::phy::{wait as phy_wait, Device, Medium};
14 use smoltcp::socket::tcp;
15 use smoltcp::time::{Duration, Instant};
16 use smoltcp::wire::{EthernetAddress, IpAddress, IpCidr};
17
18 const AMOUNT: usize = 1_000_000_000;
19
20 enum Client {
21 Reader,
22 Writer,
23 }
24
client(kind: Client)25 fn client(kind: Client) {
26 let port = match kind {
27 Client::Reader => 1234,
28 Client::Writer => 1235,
29 };
30 let mut stream = TcpStream::connect(("192.168.69.1", port)).unwrap();
31 let mut buffer = vec![0; 1_000_000];
32
33 let start = Instant::now();
34
35 let mut processed = 0;
36 while processed < AMOUNT {
37 let length = cmp::min(buffer.len(), AMOUNT - processed);
38 let result = match kind {
39 Client::Reader => stream.read(&mut buffer[..length]),
40 Client::Writer => stream.write(&buffer[..length]),
41 };
42 match result {
43 Ok(0) => break,
44 Ok(result) => {
45 // print!("(P:{})", result);
46 processed += result
47 }
48 Err(err) => panic!("cannot process: {err}"),
49 }
50 }
51
52 let end = Instant::now();
53
54 let elapsed = (end - start).total_millis() as f64 / 1000.0;
55
56 println!("throughput: {:.3} Gbps", AMOUNT as f64 / elapsed / 0.125e9);
57
58 CLIENT_DONE.store(true, Ordering::SeqCst);
59 }
60
61 static CLIENT_DONE: AtomicBool = AtomicBool::new(false);
62
main()63 fn main() {
64 #[cfg(feature = "log")]
65 utils::setup_logging("info");
66
67 let (mut opts, mut free) = utils::create_options();
68 utils::add_tuntap_options(&mut opts, &mut free);
69 utils::add_middleware_options(&mut opts, &mut free);
70 free.push("MODE");
71
72 let mut matches = utils::parse_options(&opts, free);
73 let device = utils::parse_tuntap_options(&mut matches);
74 let fd = device.as_raw_fd();
75 let mut device =
76 utils::parse_middleware_options(&mut matches, device, /*loopback=*/ false);
77 let mode = match matches.free[0].as_ref() {
78 "reader" => Client::Reader,
79 "writer" => Client::Writer,
80 _ => panic!("invalid mode"),
81 };
82
83 let tcp1_rx_buffer = tcp::SocketBuffer::new(vec![0; 65535]);
84 let tcp1_tx_buffer = tcp::SocketBuffer::new(vec![0; 65535]);
85 let tcp1_socket = tcp::Socket::new(tcp1_rx_buffer, tcp1_tx_buffer);
86
87 let tcp2_rx_buffer = tcp::SocketBuffer::new(vec![0; 65535]);
88 let tcp2_tx_buffer = tcp::SocketBuffer::new(vec![0; 65535]);
89 let tcp2_socket = tcp::Socket::new(tcp2_rx_buffer, tcp2_tx_buffer);
90
91 let mut config = Config::new();
92 config.random_seed = rand::random();
93 if device.capabilities().medium == Medium::Ethernet {
94 config.hardware_addr = Some(EthernetAddress([0x02, 0x00, 0x00, 0x00, 0x00, 0x01]).into());
95 }
96
97 let mut iface = Interface::new(config, &mut device);
98 iface.update_ip_addrs(|ip_addrs| {
99 ip_addrs
100 .push(IpCidr::new(IpAddress::v4(192, 168, 69, 1), 24))
101 .unwrap();
102 });
103
104 let mut sockets = SocketSet::new(vec![]);
105 let tcp1_handle = sockets.add(tcp1_socket);
106 let tcp2_handle = sockets.add(tcp2_socket);
107 let default_timeout = Some(Duration::from_millis(1000));
108
109 thread::spawn(move || client(mode));
110 let mut processed = 0;
111 while !CLIENT_DONE.load(Ordering::SeqCst) {
112 let timestamp = Instant::now();
113 iface.poll(timestamp, &mut device, &mut sockets);
114
115 // tcp:1234: emit data
116 let socket = sockets.get_mut::<tcp::Socket>(tcp1_handle);
117 if !socket.is_open() {
118 socket.listen(1234).unwrap();
119 }
120
121 if socket.can_send() {
122 if processed < AMOUNT {
123 let length = socket
124 .send(|buffer| {
125 let length = cmp::min(buffer.len(), AMOUNT - processed);
126 (length, length)
127 })
128 .unwrap();
129 processed += length;
130 }
131 }
132
133 // tcp:1235: sink data
134 let socket = sockets.get_mut::<tcp::Socket>(tcp2_handle);
135 if !socket.is_open() {
136 socket.listen(1235).unwrap();
137 }
138
139 if socket.can_recv() {
140 if processed < AMOUNT {
141 let length = socket
142 .recv(|buffer| {
143 let length = cmp::min(buffer.len(), AMOUNT - processed);
144 (length, length)
145 })
146 .unwrap();
147 processed += length;
148 }
149 }
150
151 match iface.poll_at(timestamp, &sockets) {
152 Some(poll_at) if timestamp < poll_at => {
153 phy_wait(fd, Some(poll_at - timestamp)).expect("wait error");
154 }
155 Some(_) => (),
156 None => {
157 phy_wait(fd, default_timeout).expect("wait error");
158 }
159 }
160 }
161 }
162