1 #![allow(clippy::collapsible_if)]
2 
3 mod utils;
4 
5 use std::cmp;
6 use std::io::{Read, Write};
7 use std::net::TcpStream;
8 use std::os::unix::io::AsRawFd;
9 use std::sync::atomic::{AtomicBool, Ordering};
10 use std::thread;
11 
12 use smoltcp::iface::{Config, Interface, SocketSet};
13 use smoltcp::phy::{wait as phy_wait, Device, Medium};
14 use smoltcp::socket::tcp;
15 use smoltcp::time::{Duration, Instant};
16 use smoltcp::wire::{EthernetAddress, IpAddress, IpCidr};
17 
18 const AMOUNT: usize = 1_000_000_000;
19 
20 enum Client {
21     Reader,
22     Writer,
23 }
24 
client(kind: Client)25 fn client(kind: Client) {
26     let port = match kind {
27         Client::Reader => 1234,
28         Client::Writer => 1235,
29     };
30     let mut stream = TcpStream::connect(("192.168.69.1", port)).unwrap();
31     let mut buffer = vec![0; 1_000_000];
32 
33     let start = Instant::now();
34 
35     let mut processed = 0;
36     while processed < AMOUNT {
37         let length = cmp::min(buffer.len(), AMOUNT - processed);
38         let result = match kind {
39             Client::Reader => stream.read(&mut buffer[..length]),
40             Client::Writer => stream.write(&buffer[..length]),
41         };
42         match result {
43             Ok(0) => break,
44             Ok(result) => {
45                 // print!("(P:{})", result);
46                 processed += result
47             }
48             Err(err) => panic!("cannot process: {err}"),
49         }
50     }
51 
52     let end = Instant::now();
53 
54     let elapsed = (end - start).total_millis() as f64 / 1000.0;
55 
56     println!("throughput: {:.3} Gbps", AMOUNT as f64 / elapsed / 0.125e9);
57 
58     CLIENT_DONE.store(true, Ordering::SeqCst);
59 }
60 
61 static CLIENT_DONE: AtomicBool = AtomicBool::new(false);
62 
main()63 fn main() {
64     #[cfg(feature = "log")]
65     utils::setup_logging("info");
66 
67     let (mut opts, mut free) = utils::create_options();
68     utils::add_tuntap_options(&mut opts, &mut free);
69     utils::add_middleware_options(&mut opts, &mut free);
70     free.push("MODE");
71 
72     let mut matches = utils::parse_options(&opts, free);
73     let device = utils::parse_tuntap_options(&mut matches);
74     let fd = device.as_raw_fd();
75     let mut device =
76         utils::parse_middleware_options(&mut matches, device, /*loopback=*/ false);
77     let mode = match matches.free[0].as_ref() {
78         "reader" => Client::Reader,
79         "writer" => Client::Writer,
80         _ => panic!("invalid mode"),
81     };
82 
83     let tcp1_rx_buffer = tcp::SocketBuffer::new(vec![0; 65535]);
84     let tcp1_tx_buffer = tcp::SocketBuffer::new(vec![0; 65535]);
85     let tcp1_socket = tcp::Socket::new(tcp1_rx_buffer, tcp1_tx_buffer);
86 
87     let tcp2_rx_buffer = tcp::SocketBuffer::new(vec![0; 65535]);
88     let tcp2_tx_buffer = tcp::SocketBuffer::new(vec![0; 65535]);
89     let tcp2_socket = tcp::Socket::new(tcp2_rx_buffer, tcp2_tx_buffer);
90 
91     let mut config = Config::new();
92     config.random_seed = rand::random();
93     if device.capabilities().medium == Medium::Ethernet {
94         config.hardware_addr = Some(EthernetAddress([0x02, 0x00, 0x00, 0x00, 0x00, 0x01]).into());
95     }
96 
97     let mut iface = Interface::new(config, &mut device);
98     iface.update_ip_addrs(|ip_addrs| {
99         ip_addrs
100             .push(IpCidr::new(IpAddress::v4(192, 168, 69, 1), 24))
101             .unwrap();
102     });
103 
104     let mut sockets = SocketSet::new(vec![]);
105     let tcp1_handle = sockets.add(tcp1_socket);
106     let tcp2_handle = sockets.add(tcp2_socket);
107     let default_timeout = Some(Duration::from_millis(1000));
108 
109     thread::spawn(move || client(mode));
110     let mut processed = 0;
111     while !CLIENT_DONE.load(Ordering::SeqCst) {
112         let timestamp = Instant::now();
113         iface.poll(timestamp, &mut device, &mut sockets);
114 
115         // tcp:1234: emit data
116         let socket = sockets.get_mut::<tcp::Socket>(tcp1_handle);
117         if !socket.is_open() {
118             socket.listen(1234).unwrap();
119         }
120 
121         if socket.can_send() {
122             if processed < AMOUNT {
123                 let length = socket
124                     .send(|buffer| {
125                         let length = cmp::min(buffer.len(), AMOUNT - processed);
126                         (length, length)
127                     })
128                     .unwrap();
129                 processed += length;
130             }
131         }
132 
133         // tcp:1235: sink data
134         let socket = sockets.get_mut::<tcp::Socket>(tcp2_handle);
135         if !socket.is_open() {
136             socket.listen(1235).unwrap();
137         }
138 
139         if socket.can_recv() {
140             if processed < AMOUNT {
141                 let length = socket
142                     .recv(|buffer| {
143                         let length = cmp::min(buffer.len(), AMOUNT - processed);
144                         (length, length)
145                     })
146                     .unwrap();
147                 processed += length;
148             }
149         }
150 
151         match iface.poll_at(timestamp, &sockets) {
152             Some(poll_at) if timestamp < poll_at => {
153                 phy_wait(fd, Some(poll_at - timestamp)).expect("wait error");
154             }
155             Some(_) => (),
156             None => {
157                 phy_wait(fd, default_timeout).expect("wait error");
158             }
159         }
160     }
161 }
162