1 use core::cmp::min; 2 #[cfg(feature = "async")] 3 use core::task::Waker; 4 5 use crate::iface::Context; 6 use crate::socket::PollAt; 7 #[cfg(feature = "async")] 8 use crate::socket::WakerRegistration; 9 use crate::storage::Empty; 10 use crate::wire::{IpEndpoint, IpListenEndpoint, IpProtocol, IpRepr, UdpRepr}; 11 12 /// A UDP packet metadata. 13 pub type PacketMetadata = crate::storage::PacketMetadata<IpEndpoint>; 14 15 /// A UDP packet ring buffer. 16 pub type PacketBuffer<'a> = crate::storage::PacketBuffer<'a, IpEndpoint>; 17 18 /// Error returned by [`Socket::bind`] 19 #[derive(Debug, PartialEq, Eq, Clone, Copy)] 20 #[cfg_attr(feature = "defmt", derive(defmt::Format))] 21 pub enum BindError { 22 InvalidState, 23 Unaddressable, 24 } 25 26 /// Error returned by [`Socket::send`] 27 #[derive(Debug, PartialEq, Eq, Clone, Copy)] 28 #[cfg_attr(feature = "defmt", derive(defmt::Format))] 29 pub enum SendError { 30 Unaddressable, 31 BufferFull, 32 } 33 34 /// Error returned by [`Socket::recv`] 35 #[derive(Debug, PartialEq, Eq, Clone, Copy)] 36 #[cfg_attr(feature = "defmt", derive(defmt::Format))] 37 pub enum RecvError { 38 Exhausted, 39 } 40 41 /// A User Datagram Protocol socket. 42 /// 43 /// A UDP socket is bound to a specific endpoint, and owns transmit and receive 44 /// packet buffers. 45 #[derive(Debug)] 46 pub struct Socket<'a> { 47 endpoint: IpListenEndpoint, 48 rx_buffer: PacketBuffer<'a>, 49 tx_buffer: PacketBuffer<'a>, 50 /// The time-to-live (IPv4) or hop limit (IPv6) value used in outgoing packets. 51 hop_limit: Option<u8>, 52 #[cfg(feature = "async")] 53 rx_waker: WakerRegistration, 54 #[cfg(feature = "async")] 55 tx_waker: WakerRegistration, 56 } 57 58 impl<'a> Socket<'a> { 59 /// Create an UDP socket with the given buffers. new(rx_buffer: PacketBuffer<'a>, tx_buffer: PacketBuffer<'a>) -> Socket<'a>60 pub fn new(rx_buffer: PacketBuffer<'a>, tx_buffer: PacketBuffer<'a>) -> Socket<'a> { 61 Socket { 62 endpoint: IpListenEndpoint::default(), 63 rx_buffer, 64 tx_buffer, 65 hop_limit: None, 66 #[cfg(feature = "async")] 67 rx_waker: WakerRegistration::new(), 68 #[cfg(feature = "async")] 69 tx_waker: WakerRegistration::new(), 70 } 71 } 72 73 /// Register a waker for receive operations. 74 /// 75 /// The waker is woken on state changes that might affect the return value 76 /// of `recv` method calls, such as receiving data, or the socket closing. 77 /// 78 /// Notes: 79 /// 80 /// - Only one waker can be registered at a time. If another waker was previously registered, 81 /// it is overwritten and will no longer be woken. 82 /// - The Waker is woken only once. Once woken, you must register it again to receive more wakes. 83 /// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `recv` has 84 /// necessarily changed. 85 #[cfg(feature = "async")] register_recv_waker(&mut self, waker: &Waker)86 pub fn register_recv_waker(&mut self, waker: &Waker) { 87 self.rx_waker.register(waker) 88 } 89 90 /// Register a waker for send operations. 91 /// 92 /// The waker is woken on state changes that might affect the return value 93 /// of `send` method calls, such as space becoming available in the transmit 94 /// buffer, or the socket closing. 95 /// 96 /// Notes: 97 /// 98 /// - Only one waker can be registered at a time. If another waker was previously registered, 99 /// it is overwritten and will no longer be woken. 100 /// - The Waker is woken only once. Once woken, you must register it again to receive more wakes. 101 /// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `send` has 102 /// necessarily changed. 103 #[cfg(feature = "async")] register_send_waker(&mut self, waker: &Waker)104 pub fn register_send_waker(&mut self, waker: &Waker) { 105 self.tx_waker.register(waker) 106 } 107 108 /// Return the bound endpoint. 109 #[inline] endpoint(&self) -> IpListenEndpoint110 pub fn endpoint(&self) -> IpListenEndpoint { 111 self.endpoint 112 } 113 114 /// Return the time-to-live (IPv4) or hop limit (IPv6) value used in outgoing packets. 115 /// 116 /// See also the [set_hop_limit](#method.set_hop_limit) method hop_limit(&self) -> Option<u8>117 pub fn hop_limit(&self) -> Option<u8> { 118 self.hop_limit 119 } 120 121 /// Set the time-to-live (IPv4) or hop limit (IPv6) value used in outgoing packets. 122 /// 123 /// A socket without an explicitly set hop limit value uses the default [IANA recommended] 124 /// value (64). 125 /// 126 /// # Panics 127 /// 128 /// This function panics if a hop limit value of 0 is given. See [RFC 1122 § 3.2.1.7]. 129 /// 130 /// [IANA recommended]: https://www.iana.org/assignments/ip-parameters/ip-parameters.xhtml 131 /// [RFC 1122 § 3.2.1.7]: https://tools.ietf.org/html/rfc1122#section-3.2.1.7 set_hop_limit(&mut self, hop_limit: Option<u8>)132 pub fn set_hop_limit(&mut self, hop_limit: Option<u8>) { 133 // A host MUST NOT send a datagram with a hop limit value of 0 134 if let Some(0) = hop_limit { 135 panic!("the time-to-live value of a packet must not be zero") 136 } 137 138 self.hop_limit = hop_limit 139 } 140 141 /// Bind the socket to the given endpoint. 142 /// 143 /// This function returns `Err(Error::Illegal)` if the socket was open 144 /// (see [is_open](#method.is_open)), and `Err(Error::Unaddressable)` 145 /// if the port in the given endpoint is zero. bind<T: Into<IpListenEndpoint>>(&mut self, endpoint: T) -> Result<(), BindError>146 pub fn bind<T: Into<IpListenEndpoint>>(&mut self, endpoint: T) -> Result<(), BindError> { 147 let endpoint = endpoint.into(); 148 if endpoint.port == 0 { 149 return Err(BindError::Unaddressable); 150 } 151 152 if self.is_open() { 153 return Err(BindError::InvalidState); 154 } 155 156 self.endpoint = endpoint; 157 158 #[cfg(feature = "async")] 159 { 160 self.rx_waker.wake(); 161 self.tx_waker.wake(); 162 } 163 164 Ok(()) 165 } 166 167 /// Close the socket. close(&mut self)168 pub fn close(&mut self) { 169 // Clear the bound endpoint of the socket. 170 self.endpoint = IpListenEndpoint::default(); 171 172 // Reset the RX and TX buffers of the socket. 173 self.tx_buffer.reset(); 174 self.rx_buffer.reset(); 175 176 #[cfg(feature = "async")] 177 { 178 self.rx_waker.wake(); 179 self.tx_waker.wake(); 180 } 181 } 182 183 /// Check whether the socket is open. 184 #[inline] is_open(&self) -> bool185 pub fn is_open(&self) -> bool { 186 self.endpoint.port != 0 187 } 188 189 /// Check whether the transmit buffer is full. 190 #[inline] can_send(&self) -> bool191 pub fn can_send(&self) -> bool { 192 !self.tx_buffer.is_full() 193 } 194 195 /// Check whether the receive buffer is not empty. 196 #[inline] can_recv(&self) -> bool197 pub fn can_recv(&self) -> bool { 198 !self.rx_buffer.is_empty() 199 } 200 201 /// Return the maximum number packets the socket can receive. 202 #[inline] packet_recv_capacity(&self) -> usize203 pub fn packet_recv_capacity(&self) -> usize { 204 self.rx_buffer.packet_capacity() 205 } 206 207 /// Return the maximum number packets the socket can transmit. 208 #[inline] packet_send_capacity(&self) -> usize209 pub fn packet_send_capacity(&self) -> usize { 210 self.tx_buffer.packet_capacity() 211 } 212 213 /// Return the maximum number of bytes inside the recv buffer. 214 #[inline] payload_recv_capacity(&self) -> usize215 pub fn payload_recv_capacity(&self) -> usize { 216 self.rx_buffer.payload_capacity() 217 } 218 219 /// Return the maximum number of bytes inside the transmit buffer. 220 #[inline] payload_send_capacity(&self) -> usize221 pub fn payload_send_capacity(&self) -> usize { 222 self.tx_buffer.payload_capacity() 223 } 224 225 /// Enqueue a packet to be sent to a given remote endpoint, and return a pointer 226 /// to its payload. 227 /// 228 /// This function returns `Err(Error::Exhausted)` if the transmit buffer is full, 229 /// `Err(Error::Unaddressable)` if local or remote port, or remote address are unspecified, 230 /// and `Err(Error::Truncated)` if there is not enough transmit buffer capacity 231 /// to ever send this packet. send( &mut self, size: usize, remote_endpoint: IpEndpoint, ) -> Result<&mut [u8], SendError>232 pub fn send( 233 &mut self, 234 size: usize, 235 remote_endpoint: IpEndpoint, 236 ) -> Result<&mut [u8], SendError> { 237 if self.endpoint.port == 0 { 238 return Err(SendError::Unaddressable); 239 } 240 if remote_endpoint.addr.is_unspecified() { 241 return Err(SendError::Unaddressable); 242 } 243 if remote_endpoint.port == 0 { 244 return Err(SendError::Unaddressable); 245 } 246 247 let payload_buf = self 248 .tx_buffer 249 .enqueue(size, remote_endpoint) 250 .map_err(|_| SendError::BufferFull)?; 251 252 net_trace!( 253 "udp:{}:{}: buffer to send {} octets", 254 self.endpoint, 255 remote_endpoint, 256 size 257 ); 258 Ok(payload_buf) 259 } 260 261 /// Enqueue a packet to be send to a given remote endpoint and pass the buffer 262 /// to the provided closure. The closure then returns the size of the data written 263 /// into the buffer. 264 /// 265 /// Also see [send](#method.send). send_with<F>( &mut self, max_size: usize, remote_endpoint: IpEndpoint, f: F, ) -> Result<usize, SendError> where F: FnOnce(&mut [u8]) -> usize,266 pub fn send_with<F>( 267 &mut self, 268 max_size: usize, 269 remote_endpoint: IpEndpoint, 270 f: F, 271 ) -> Result<usize, SendError> 272 where 273 F: FnOnce(&mut [u8]) -> usize, 274 { 275 if self.endpoint.port == 0 { 276 return Err(SendError::Unaddressable); 277 } 278 if remote_endpoint.addr.is_unspecified() { 279 return Err(SendError::Unaddressable); 280 } 281 if remote_endpoint.port == 0 { 282 return Err(SendError::Unaddressable); 283 } 284 285 let size = self 286 .tx_buffer 287 .enqueue_with_infallible(max_size, remote_endpoint, f) 288 .map_err(|_| SendError::BufferFull)?; 289 290 net_trace!( 291 "udp:{}:{}: buffer to send {} octets", 292 self.endpoint, 293 remote_endpoint, 294 size 295 ); 296 Ok(size) 297 } 298 299 /// Enqueue a packet to be sent to a given remote endpoint, and fill it from a slice. 300 /// 301 /// See also [send](#method.send). send_slice( &mut self, data: &[u8], remote_endpoint: IpEndpoint, ) -> Result<(), SendError>302 pub fn send_slice( 303 &mut self, 304 data: &[u8], 305 remote_endpoint: IpEndpoint, 306 ) -> Result<(), SendError> { 307 self.send(data.len(), remote_endpoint)? 308 .copy_from_slice(data); 309 Ok(()) 310 } 311 312 /// Dequeue a packet received from a remote endpoint, and return the endpoint as well 313 /// as a pointer to the payload. 314 /// 315 /// This function returns `Err(Error::Exhausted)` if the receive buffer is empty. recv(&mut self) -> Result<(&[u8], IpEndpoint), RecvError>316 pub fn recv(&mut self) -> Result<(&[u8], IpEndpoint), RecvError> { 317 let (remote_endpoint, payload_buf) = 318 self.rx_buffer.dequeue().map_err(|_| RecvError::Exhausted)?; 319 320 net_trace!( 321 "udp:{}:{}: receive {} buffered octets", 322 self.endpoint, 323 remote_endpoint, 324 payload_buf.len() 325 ); 326 Ok((payload_buf, remote_endpoint)) 327 } 328 329 /// Dequeue a packet received from a remote endpoint, copy the payload into the given slice, 330 /// and return the amount of octets copied as well as the endpoint. 331 /// 332 /// See also [recv](#method.recv). recv_slice(&mut self, data: &mut [u8]) -> Result<(usize, IpEndpoint), RecvError>333 pub fn recv_slice(&mut self, data: &mut [u8]) -> Result<(usize, IpEndpoint), RecvError> { 334 let (buffer, endpoint) = self.recv().map_err(|_| RecvError::Exhausted)?; 335 let length = min(data.len(), buffer.len()); 336 data[..length].copy_from_slice(&buffer[..length]); 337 Ok((length, endpoint)) 338 } 339 340 /// Peek at a packet received from a remote endpoint, and return the endpoint as well 341 /// as a pointer to the payload without removing the packet from the receive buffer. 342 /// This function otherwise behaves identically to [recv](#method.recv). 343 /// 344 /// It returns `Err(Error::Exhausted)` if the receive buffer is empty. peek(&mut self) -> Result<(&[u8], &IpEndpoint), RecvError>345 pub fn peek(&mut self) -> Result<(&[u8], &IpEndpoint), RecvError> { 346 let endpoint = self.endpoint; 347 self.rx_buffer.peek().map_err(|_| RecvError::Exhausted).map( 348 |(remote_endpoint, payload_buf)| { 349 net_trace!( 350 "udp:{}:{}: peek {} buffered octets", 351 endpoint, 352 remote_endpoint, 353 payload_buf.len() 354 ); 355 (payload_buf, remote_endpoint) 356 }, 357 ) 358 } 359 360 /// Peek at a packet received from a remote endpoint, copy the payload into the given slice, 361 /// and return the amount of octets copied as well as the endpoint without removing the 362 /// packet from the receive buffer. 363 /// This function otherwise behaves identically to [recv_slice](#method.recv_slice). 364 /// 365 /// See also [peek](#method.peek). peek_slice(&mut self, data: &mut [u8]) -> Result<(usize, &IpEndpoint), RecvError>366 pub fn peek_slice(&mut self, data: &mut [u8]) -> Result<(usize, &IpEndpoint), RecvError> { 367 let (buffer, endpoint) = self.peek()?; 368 let length = min(data.len(), buffer.len()); 369 data[..length].copy_from_slice(&buffer[..length]); 370 Ok((length, endpoint)) 371 } 372 accepts(&self, _cx: &mut Context, ip_repr: &IpRepr, repr: &UdpRepr) -> bool373 pub(crate) fn accepts(&self, _cx: &mut Context, ip_repr: &IpRepr, repr: &UdpRepr) -> bool { 374 if self.endpoint.port != repr.dst_port { 375 return false; 376 } 377 if self.endpoint.addr.is_some() 378 && self.endpoint.addr != Some(ip_repr.dst_addr()) 379 && !ip_repr.dst_addr().is_broadcast() 380 && !ip_repr.dst_addr().is_multicast() 381 { 382 return false; 383 } 384 385 true 386 } 387 process( &mut self, cx: &mut Context, ip_repr: &IpRepr, repr: &UdpRepr, payload: &[u8], )388 pub(crate) fn process( 389 &mut self, 390 cx: &mut Context, 391 ip_repr: &IpRepr, 392 repr: &UdpRepr, 393 payload: &[u8], 394 ) { 395 debug_assert!(self.accepts(cx, ip_repr, repr)); 396 397 let size = payload.len(); 398 399 let remote_endpoint = IpEndpoint { 400 addr: ip_repr.src_addr(), 401 port: repr.src_port, 402 }; 403 404 net_trace!( 405 "udp:{}:{}: receiving {} octets", 406 self.endpoint, 407 remote_endpoint, 408 size 409 ); 410 411 match self.rx_buffer.enqueue(size, remote_endpoint) { 412 Ok(buf) => buf.copy_from_slice(payload), 413 Err(_) => net_trace!( 414 "udp:{}:{}: buffer full, dropped incoming packet", 415 self.endpoint, 416 remote_endpoint 417 ), 418 } 419 420 #[cfg(feature = "async")] 421 self.rx_waker.wake(); 422 } 423 dispatch<F, E>(&mut self, cx: &mut Context, emit: F) -> Result<(), E> where F: FnOnce(&mut Context, (IpRepr, UdpRepr, &[u8])) -> Result<(), E>,424 pub(crate) fn dispatch<F, E>(&mut self, cx: &mut Context, emit: F) -> Result<(), E> 425 where 426 F: FnOnce(&mut Context, (IpRepr, UdpRepr, &[u8])) -> Result<(), E>, 427 { 428 let endpoint = self.endpoint; 429 let hop_limit = self.hop_limit.unwrap_or(64); 430 431 let res = self.tx_buffer.dequeue_with(|remote_endpoint, payload_buf| { 432 let src_addr = match endpoint.addr { 433 Some(addr) => addr, 434 None => match cx.get_source_address(remote_endpoint.addr) { 435 Some(addr) => addr, 436 None => { 437 net_trace!( 438 "udp:{}:{}: cannot find suitable source address, dropping.", 439 endpoint, 440 remote_endpoint 441 ); 442 return Ok(()); 443 } 444 }, 445 }; 446 447 net_trace!( 448 "udp:{}:{}: sending {} octets", 449 endpoint, 450 remote_endpoint, 451 payload_buf.len() 452 ); 453 454 let repr = UdpRepr { 455 src_port: endpoint.port, 456 dst_port: remote_endpoint.port, 457 }; 458 let ip_repr = IpRepr::new( 459 src_addr, 460 remote_endpoint.addr, 461 IpProtocol::Udp, 462 repr.header_len() + payload_buf.len(), 463 hop_limit, 464 ); 465 emit(cx, (ip_repr, repr, payload_buf)) 466 }); 467 match res { 468 Err(Empty) => Ok(()), 469 Ok(Err(e)) => Err(e), 470 Ok(Ok(())) => { 471 #[cfg(feature = "async")] 472 self.tx_waker.wake(); 473 Ok(()) 474 } 475 } 476 } 477 poll_at(&self, _cx: &mut Context) -> PollAt478 pub(crate) fn poll_at(&self, _cx: &mut Context) -> PollAt { 479 if self.tx_buffer.is_empty() { 480 PollAt::Ingress 481 } else { 482 PollAt::Now 483 } 484 } 485 } 486 487 #[cfg(test)] 488 mod test { 489 use super::*; 490 use crate::wire::{IpRepr, UdpRepr}; 491 buffer(packets: usize) -> PacketBuffer<'static>492 fn buffer(packets: usize) -> PacketBuffer<'static> { 493 PacketBuffer::new(vec![PacketMetadata::EMPTY; packets], vec![0; 16 * packets]) 494 } 495 socket( rx_buffer: PacketBuffer<'static>, tx_buffer: PacketBuffer<'static>, ) -> Socket<'static>496 fn socket( 497 rx_buffer: PacketBuffer<'static>, 498 tx_buffer: PacketBuffer<'static>, 499 ) -> Socket<'static> { 500 Socket::new(rx_buffer, tx_buffer) 501 } 502 503 const LOCAL_PORT: u16 = 53; 504 const REMOTE_PORT: u16 = 49500; 505 506 cfg_if::cfg_if! { 507 if #[cfg(feature = "proto-ipv4")] { 508 use crate::wire::Ipv4Address as IpvXAddress; 509 use crate::wire::Ipv4Repr as IpvXRepr; 510 use IpRepr::Ipv4 as IpReprIpvX; 511 512 const LOCAL_ADDR: IpvXAddress = IpvXAddress([192, 168, 1, 1]); 513 const REMOTE_ADDR: IpvXAddress = IpvXAddress([192, 168, 1, 2]); 514 const OTHER_ADDR: IpvXAddress = IpvXAddress([192, 168, 1, 3]); 515 } else { 516 use crate::wire::Ipv6Address as IpvXAddress; 517 use crate::wire::Ipv6Repr as IpvXRepr; 518 use IpRepr::Ipv6 as IpReprIpvX; 519 520 const LOCAL_ADDR: IpvXAddress = IpvXAddress([ 521 0xfe, 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 522 ]); 523 const REMOTE_ADDR: IpvXAddress = IpvXAddress([ 524 0xfe, 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 525 ]); 526 const OTHER_ADDR: IpvXAddress = IpvXAddress([ 527 0xfe, 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 528 ]); 529 } 530 } 531 532 pub const LOCAL_END: IpEndpoint = IpEndpoint { 533 addr: LOCAL_ADDR.into_address(), 534 port: LOCAL_PORT, 535 }; 536 pub const REMOTE_END: IpEndpoint = IpEndpoint { 537 addr: REMOTE_ADDR.into_address(), 538 port: REMOTE_PORT, 539 }; 540 541 pub const LOCAL_IP_REPR: IpRepr = IpReprIpvX(IpvXRepr { 542 src_addr: LOCAL_ADDR, 543 dst_addr: REMOTE_ADDR, 544 next_header: IpProtocol::Udp, 545 payload_len: 8 + 6, 546 hop_limit: 64, 547 }); 548 549 pub const REMOTE_IP_REPR: IpRepr = IpReprIpvX(IpvXRepr { 550 src_addr: REMOTE_ADDR, 551 dst_addr: LOCAL_ADDR, 552 next_header: IpProtocol::Udp, 553 payload_len: 8 + 6, 554 hop_limit: 64, 555 }); 556 557 pub const BAD_IP_REPR: IpRepr = IpReprIpvX(IpvXRepr { 558 src_addr: REMOTE_ADDR, 559 dst_addr: OTHER_ADDR, 560 next_header: IpProtocol::Udp, 561 payload_len: 8 + 6, 562 hop_limit: 64, 563 }); 564 565 const LOCAL_UDP_REPR: UdpRepr = UdpRepr { 566 src_port: LOCAL_PORT, 567 dst_port: REMOTE_PORT, 568 }; 569 570 const REMOTE_UDP_REPR: UdpRepr = UdpRepr { 571 src_port: REMOTE_PORT, 572 dst_port: LOCAL_PORT, 573 }; 574 575 const PAYLOAD: &[u8] = b"abcdef"; 576 577 #[test] test_bind_unaddressable()578 fn test_bind_unaddressable() { 579 let mut socket = socket(buffer(0), buffer(0)); 580 assert_eq!(socket.bind(0), Err(BindError::Unaddressable)); 581 } 582 583 #[test] test_bind_twice()584 fn test_bind_twice() { 585 let mut socket = socket(buffer(0), buffer(0)); 586 assert_eq!(socket.bind(1), Ok(())); 587 assert_eq!(socket.bind(2), Err(BindError::InvalidState)); 588 } 589 590 #[test] 591 #[should_panic(expected = "the time-to-live value of a packet must not be zero")] test_set_hop_limit_zero()592 fn test_set_hop_limit_zero() { 593 let mut s = socket(buffer(0), buffer(1)); 594 s.set_hop_limit(Some(0)); 595 } 596 597 #[test] test_send_unaddressable()598 fn test_send_unaddressable() { 599 let mut socket = socket(buffer(0), buffer(1)); 600 601 assert_eq!( 602 socket.send_slice(b"abcdef", REMOTE_END), 603 Err(SendError::Unaddressable) 604 ); 605 assert_eq!(socket.bind(LOCAL_PORT), Ok(())); 606 assert_eq!( 607 socket.send_slice( 608 b"abcdef", 609 IpEndpoint { 610 addr: IpvXAddress::UNSPECIFIED.into(), 611 ..REMOTE_END 612 } 613 ), 614 Err(SendError::Unaddressable) 615 ); 616 assert_eq!( 617 socket.send_slice( 618 b"abcdef", 619 IpEndpoint { 620 port: 0, 621 ..REMOTE_END 622 } 623 ), 624 Err(SendError::Unaddressable) 625 ); 626 assert_eq!(socket.send_slice(b"abcdef", REMOTE_END), Ok(())); 627 } 628 629 #[test] test_send_dispatch()630 fn test_send_dispatch() { 631 let mut socket = socket(buffer(0), buffer(1)); 632 let mut cx = Context::mock(); 633 634 assert_eq!(socket.bind(LOCAL_END), Ok(())); 635 636 assert!(socket.can_send()); 637 assert_eq!( 638 socket.dispatch(&mut cx, |_, _| unreachable!()), 639 Ok::<_, ()>(()) 640 ); 641 642 assert_eq!(socket.send_slice(b"abcdef", REMOTE_END), Ok(())); 643 assert_eq!( 644 socket.send_slice(b"123456", REMOTE_END), 645 Err(SendError::BufferFull) 646 ); 647 assert!(!socket.can_send()); 648 649 assert_eq!( 650 socket.dispatch(&mut cx, |_, (ip_repr, udp_repr, payload)| { 651 assert_eq!(ip_repr, LOCAL_IP_REPR); 652 assert_eq!(udp_repr, LOCAL_UDP_REPR); 653 assert_eq!(payload, PAYLOAD); 654 Err(()) 655 }), 656 Err(()) 657 ); 658 assert!(!socket.can_send()); 659 660 assert_eq!( 661 socket.dispatch(&mut cx, |_, (ip_repr, udp_repr, payload)| { 662 assert_eq!(ip_repr, LOCAL_IP_REPR); 663 assert_eq!(udp_repr, LOCAL_UDP_REPR); 664 assert_eq!(payload, PAYLOAD); 665 Ok::<_, ()>(()) 666 }), 667 Ok(()) 668 ); 669 assert!(socket.can_send()); 670 } 671 672 #[test] test_recv_process()673 fn test_recv_process() { 674 let mut socket = socket(buffer(1), buffer(0)); 675 let mut cx = Context::mock(); 676 677 assert_eq!(socket.bind(LOCAL_PORT), Ok(())); 678 679 assert!(!socket.can_recv()); 680 assert_eq!(socket.recv(), Err(RecvError::Exhausted)); 681 682 assert!(socket.accepts(&mut cx, &REMOTE_IP_REPR, &REMOTE_UDP_REPR)); 683 socket.process(&mut cx, &REMOTE_IP_REPR, &REMOTE_UDP_REPR, PAYLOAD); 684 assert!(socket.can_recv()); 685 686 assert!(socket.accepts(&mut cx, &REMOTE_IP_REPR, &REMOTE_UDP_REPR)); 687 socket.process(&mut cx, &REMOTE_IP_REPR, &REMOTE_UDP_REPR, PAYLOAD); 688 689 assert_eq!(socket.recv(), Ok((&b"abcdef"[..], REMOTE_END))); 690 assert!(!socket.can_recv()); 691 } 692 693 #[test] test_peek_process()694 fn test_peek_process() { 695 let mut socket = socket(buffer(1), buffer(0)); 696 let mut cx = Context::mock(); 697 698 assert_eq!(socket.bind(LOCAL_PORT), Ok(())); 699 700 assert_eq!(socket.peek(), Err(RecvError::Exhausted)); 701 702 socket.process(&mut cx, &REMOTE_IP_REPR, &REMOTE_UDP_REPR, PAYLOAD); 703 assert_eq!(socket.peek(), Ok((&b"abcdef"[..], &REMOTE_END))); 704 assert_eq!(socket.recv(), Ok((&b"abcdef"[..], REMOTE_END))); 705 assert_eq!(socket.peek(), Err(RecvError::Exhausted)); 706 } 707 708 #[test] test_recv_truncated_slice()709 fn test_recv_truncated_slice() { 710 let mut socket = socket(buffer(1), buffer(0)); 711 let mut cx = Context::mock(); 712 713 assert_eq!(socket.bind(LOCAL_PORT), Ok(())); 714 715 assert!(socket.accepts(&mut cx, &REMOTE_IP_REPR, &REMOTE_UDP_REPR)); 716 socket.process(&mut cx, &REMOTE_IP_REPR, &REMOTE_UDP_REPR, PAYLOAD); 717 718 let mut slice = [0; 4]; 719 assert_eq!(socket.recv_slice(&mut slice[..]), Ok((4, REMOTE_END))); 720 assert_eq!(&slice, b"abcd"); 721 } 722 723 #[test] test_peek_truncated_slice()724 fn test_peek_truncated_slice() { 725 let mut socket = socket(buffer(1), buffer(0)); 726 let mut cx = Context::mock(); 727 728 assert_eq!(socket.bind(LOCAL_PORT), Ok(())); 729 730 socket.process(&mut cx, &REMOTE_IP_REPR, &REMOTE_UDP_REPR, PAYLOAD); 731 732 let mut slice = [0; 4]; 733 assert_eq!(socket.peek_slice(&mut slice[..]), Ok((4, &REMOTE_END))); 734 assert_eq!(&slice, b"abcd"); 735 assert_eq!(socket.recv_slice(&mut slice[..]), Ok((4, REMOTE_END))); 736 assert_eq!(&slice, b"abcd"); 737 assert_eq!(socket.peek_slice(&mut slice[..]), Err(RecvError::Exhausted)); 738 } 739 740 #[test] test_set_hop_limit()741 fn test_set_hop_limit() { 742 let mut s = socket(buffer(0), buffer(1)); 743 let mut cx = Context::mock(); 744 745 assert_eq!(s.bind(LOCAL_END), Ok(())); 746 747 s.set_hop_limit(Some(0x2a)); 748 assert_eq!(s.send_slice(b"abcdef", REMOTE_END), Ok(())); 749 assert_eq!( 750 s.dispatch(&mut cx, |_, (ip_repr, _, _)| { 751 assert_eq!( 752 ip_repr, 753 IpReprIpvX(IpvXRepr { 754 src_addr: LOCAL_ADDR, 755 dst_addr: REMOTE_ADDR, 756 next_header: IpProtocol::Udp, 757 payload_len: 8 + 6, 758 hop_limit: 0x2a, 759 }) 760 ); 761 Ok::<_, ()>(()) 762 }), 763 Ok(()) 764 ); 765 } 766 767 #[test] test_doesnt_accept_wrong_port()768 fn test_doesnt_accept_wrong_port() { 769 let mut socket = socket(buffer(1), buffer(0)); 770 let mut cx = Context::mock(); 771 772 assert_eq!(socket.bind(LOCAL_PORT), Ok(())); 773 774 let mut udp_repr = REMOTE_UDP_REPR; 775 assert!(socket.accepts(&mut cx, &REMOTE_IP_REPR, &udp_repr)); 776 udp_repr.dst_port += 1; 777 assert!(!socket.accepts(&mut cx, &REMOTE_IP_REPR, &udp_repr)); 778 } 779 780 #[test] test_doesnt_accept_wrong_ip()781 fn test_doesnt_accept_wrong_ip() { 782 let mut cx = Context::mock(); 783 784 let mut port_bound_socket = socket(buffer(1), buffer(0)); 785 assert_eq!(port_bound_socket.bind(LOCAL_PORT), Ok(())); 786 assert!(port_bound_socket.accepts(&mut cx, &BAD_IP_REPR, &REMOTE_UDP_REPR)); 787 788 let mut ip_bound_socket = socket(buffer(1), buffer(0)); 789 assert_eq!(ip_bound_socket.bind(LOCAL_END), Ok(())); 790 assert!(!ip_bound_socket.accepts(&mut cx, &BAD_IP_REPR, &REMOTE_UDP_REPR)); 791 } 792 793 #[test] test_send_large_packet()794 fn test_send_large_packet() { 795 // buffer(4) creates a payload buffer of size 16*4 796 let mut socket = socket(buffer(0), buffer(4)); 797 assert_eq!(socket.bind(LOCAL_END), Ok(())); 798 799 let too_large = b"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdefx"; 800 assert_eq!( 801 socket.send_slice(too_large, REMOTE_END), 802 Err(SendError::BufferFull) 803 ); 804 assert_eq!(socket.send_slice(&too_large[..16 * 4], REMOTE_END), Ok(())); 805 } 806 807 #[test] test_process_empty_payload()808 fn test_process_empty_payload() { 809 let recv_buffer = PacketBuffer::new(vec![PacketMetadata::EMPTY; 1], vec![]); 810 let mut socket = socket(recv_buffer, buffer(0)); 811 let mut cx = Context::mock(); 812 813 assert_eq!(socket.bind(LOCAL_PORT), Ok(())); 814 815 let repr = UdpRepr { 816 src_port: REMOTE_PORT, 817 dst_port: LOCAL_PORT, 818 }; 819 socket.process(&mut cx, &REMOTE_IP_REPR, &repr, &[]); 820 assert_eq!(socket.recv(), Ok((&[][..], REMOTE_END))); 821 } 822 823 #[test] test_closing()824 fn test_closing() { 825 let recv_buffer = PacketBuffer::new(vec![PacketMetadata::EMPTY; 1], vec![]); 826 let mut socket = socket(recv_buffer, buffer(0)); 827 assert_eq!(socket.bind(LOCAL_PORT), Ok(())); 828 829 assert!(socket.is_open()); 830 socket.close(); 831 assert!(!socket.is_open()); 832 } 833 } 834