1 use managed::ManagedSlice; 2 3 use crate::storage::{Full, RingBuffer}; 4 5 use super::Empty; 6 7 /// Size and header of a packet. 8 #[derive(Debug, Clone, Copy)] 9 #[cfg_attr(feature = "defmt", derive(defmt::Format))] 10 pub struct PacketMetadata<H> { 11 size: usize, 12 header: Option<H>, 13 } 14 15 impl<H> PacketMetadata<H> { 16 /// Empty packet description. 17 pub const EMPTY: PacketMetadata<H> = PacketMetadata { 18 size: 0, 19 header: None, 20 }; 21 padding(size: usize) -> PacketMetadata<H>22 fn padding(size: usize) -> PacketMetadata<H> { 23 PacketMetadata { 24 size: size, 25 header: None, 26 } 27 } 28 packet(size: usize, header: H) -> PacketMetadata<H>29 fn packet(size: usize, header: H) -> PacketMetadata<H> { 30 PacketMetadata { 31 size: size, 32 header: Some(header), 33 } 34 } 35 is_padding(&self) -> bool36 fn is_padding(&self) -> bool { 37 self.header.is_none() 38 } 39 } 40 41 /// An UDP packet ring buffer. 42 #[derive(Debug)] 43 pub struct PacketBuffer<'a, H: 'a> { 44 metadata_ring: RingBuffer<'a, PacketMetadata<H>>, 45 payload_ring: RingBuffer<'a, u8>, 46 } 47 48 impl<'a, H> PacketBuffer<'a, H> { 49 /// Create a new packet buffer with the provided metadata and payload storage. 50 /// 51 /// Metadata storage limits the maximum _number_ of packets in the buffer and payload 52 /// storage limits the maximum _total size_ of packets. new<MS, PS>(metadata_storage: MS, payload_storage: PS) -> PacketBuffer<'a, H> where MS: Into<ManagedSlice<'a, PacketMetadata<H>>>, PS: Into<ManagedSlice<'a, u8>>,53 pub fn new<MS, PS>(metadata_storage: MS, payload_storage: PS) -> PacketBuffer<'a, H> 54 where 55 MS: Into<ManagedSlice<'a, PacketMetadata<H>>>, 56 PS: Into<ManagedSlice<'a, u8>>, 57 { 58 PacketBuffer { 59 metadata_ring: RingBuffer::new(metadata_storage), 60 payload_ring: RingBuffer::new(payload_storage), 61 } 62 } 63 64 /// Query whether the buffer is empty. is_empty(&self) -> bool65 pub fn is_empty(&self) -> bool { 66 self.metadata_ring.is_empty() 67 } 68 69 /// Query whether the buffer is full. is_full(&self) -> bool70 pub fn is_full(&self) -> bool { 71 self.metadata_ring.is_full() 72 } 73 74 // There is currently no enqueue_with() because of the complexity of managing padding 75 // in case of failure. 76 77 /// Enqueue a single packet with the given header into the buffer, and 78 /// return a reference to its payload, or return `Err(Full)` 79 /// if the buffer is full. enqueue(&mut self, size: usize, header: H) -> Result<&mut [u8], Full>80 pub fn enqueue(&mut self, size: usize, header: H) -> Result<&mut [u8], Full> { 81 if self.payload_ring.capacity() < size || self.metadata_ring.is_full() { 82 return Err(Full); 83 } 84 85 // Ring is currently empty. Clear it (resetting `read_at`) to maximize 86 // for contiguous space. 87 if self.payload_ring.is_empty() { 88 self.payload_ring.clear(); 89 } 90 91 let window = self.payload_ring.window(); 92 let contig_window = self.payload_ring.contiguous_window(); 93 94 if window < size { 95 return Err(Full); 96 } else if contig_window < size { 97 if window - contig_window < size { 98 // The buffer length is larger than the current contiguous window 99 // and is larger than the contiguous window will be after adding 100 // the padding necessary to circle around to the beginning of the 101 // ring buffer. 102 return Err(Full); 103 } else { 104 // Add padding to the end of the ring buffer so that the 105 // contiguous window is at the beginning of the ring buffer. 106 *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window); 107 // note(discard): function does not write to the result 108 // enqueued padding buffer location 109 let _buf_enqueued = self.payload_ring.enqueue_many(contig_window); 110 } 111 } 112 113 *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header); 114 115 let payload_buf = self.payload_ring.enqueue_many(size); 116 debug_assert!(payload_buf.len() == size); 117 Ok(payload_buf) 118 } 119 120 /// Call `f` with a packet from the buffer large enough to fit `max_size` bytes. The packet 121 /// is shrunk to the size returned from `f` and enqueued into the buffer. enqueue_with_infallible<'b, F>( &'b mut self, max_size: usize, header: H, f: F, ) -> Result<usize, Full> where F: FnOnce(&'b mut [u8]) -> usize,122 pub fn enqueue_with_infallible<'b, F>( 123 &'b mut self, 124 max_size: usize, 125 header: H, 126 f: F, 127 ) -> Result<usize, Full> 128 where 129 F: FnOnce(&'b mut [u8]) -> usize, 130 { 131 if self.payload_ring.capacity() < max_size || self.metadata_ring.is_full() { 132 return Err(Full); 133 } 134 135 let window = self.payload_ring.window(); 136 let contig_window = self.payload_ring.contiguous_window(); 137 138 if window < max_size { 139 return Err(Full); 140 } else if contig_window < max_size { 141 if window - contig_window < max_size { 142 // The buffer length is larger than the current contiguous window 143 // and is larger than the contiguous window will be after adding 144 // the padding necessary to circle around to the beginning of the 145 // ring buffer. 146 return Err(Full); 147 } else { 148 // Add padding to the end of the ring buffer so that the 149 // contiguous window is at the beginning of the ring buffer. 150 *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window); 151 // note(discard): function does not write to the result 152 // enqueued padding buffer location 153 let _buf_enqueued = self.payload_ring.enqueue_many(contig_window); 154 } 155 } 156 157 let (size, _) = self 158 .payload_ring 159 .enqueue_many_with(|data| (f(&mut data[..max_size]), ())); 160 161 *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header); 162 163 Ok(size) 164 } 165 dequeue_padding(&mut self)166 fn dequeue_padding(&mut self) { 167 let _ = self.metadata_ring.dequeue_one_with(|metadata| { 168 if metadata.is_padding() { 169 // note(discard): function does not use value of dequeued padding bytes 170 let _buf_dequeued = self.payload_ring.dequeue_many(metadata.size); 171 Ok(()) // dequeue metadata 172 } else { 173 Err(()) // don't dequeue metadata 174 } 175 }); 176 } 177 178 /// Call `f` with a single packet from the buffer, and dequeue the packet if `f` 179 /// returns successfully, or return `Err(EmptyError)` if the buffer is empty. dequeue_with<'c, R, E, F>(&'c mut self, f: F) -> Result<Result<R, E>, Empty> where F: FnOnce(&mut H, &'c mut [u8]) -> Result<R, E>,180 pub fn dequeue_with<'c, R, E, F>(&'c mut self, f: F) -> Result<Result<R, E>, Empty> 181 where 182 F: FnOnce(&mut H, &'c mut [u8]) -> Result<R, E>, 183 { 184 self.dequeue_padding(); 185 186 self.metadata_ring.dequeue_one_with(|metadata| { 187 self.payload_ring 188 .dequeue_many_with(|payload_buf| { 189 debug_assert!(payload_buf.len() >= metadata.size); 190 191 match f( 192 metadata.header.as_mut().unwrap(), 193 &mut payload_buf[..metadata.size], 194 ) { 195 Ok(val) => (metadata.size, Ok(val)), 196 Err(err) => (0, Err(err)), 197 } 198 }) 199 .1 200 }) 201 } 202 203 /// Dequeue a single packet from the buffer, and return a reference to its payload 204 /// as well as its header, or return `Err(Error::Exhausted)` if the buffer is empty. dequeue(&mut self) -> Result<(H, &mut [u8]), Empty>205 pub fn dequeue(&mut self) -> Result<(H, &mut [u8]), Empty> { 206 self.dequeue_padding(); 207 208 let meta = self.metadata_ring.dequeue_one()?; 209 210 let payload_buf = self.payload_ring.dequeue_many(meta.size); 211 debug_assert!(payload_buf.len() == meta.size); 212 Ok((meta.header.take().unwrap(), payload_buf)) 213 } 214 215 /// Peek at a single packet from the buffer without removing it, and return a reference to 216 /// its payload as well as its header, or return `Err(Error:Exhausted)` if the buffer is empty. 217 /// 218 /// This function otherwise behaves identically to [dequeue](#method.dequeue). peek(&mut self) -> Result<(&H, &[u8]), Empty>219 pub fn peek(&mut self) -> Result<(&H, &[u8]), Empty> { 220 self.dequeue_padding(); 221 222 if let Some(metadata) = self.metadata_ring.get_allocated(0, 1).first() { 223 Ok(( 224 metadata.header.as_ref().unwrap(), 225 self.payload_ring.get_allocated(0, metadata.size), 226 )) 227 } else { 228 Err(Empty) 229 } 230 } 231 232 /// Return the maximum number packets that can be stored. packet_capacity(&self) -> usize233 pub fn packet_capacity(&self) -> usize { 234 self.metadata_ring.capacity() 235 } 236 237 /// Return the maximum number of bytes in the payload ring buffer. payload_capacity(&self) -> usize238 pub fn payload_capacity(&self) -> usize { 239 self.payload_ring.capacity() 240 } 241 242 /// Reset the packet buffer and clear any staged. 243 #[allow(unused)] reset(&mut self)244 pub(crate) fn reset(&mut self) { 245 self.payload_ring.clear(); 246 self.metadata_ring.clear(); 247 } 248 } 249 250 #[cfg(test)] 251 mod test { 252 use super::*; 253 buffer() -> PacketBuffer<'static, ()>254 fn buffer() -> PacketBuffer<'static, ()> { 255 PacketBuffer::new(vec![PacketMetadata::EMPTY; 4], vec![0u8; 16]) 256 } 257 258 #[test] test_simple()259 fn test_simple() { 260 let mut buffer = buffer(); 261 buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef"); 262 assert_eq!(buffer.enqueue(16, ()), Err(Full)); 263 assert_eq!(buffer.metadata_ring.len(), 1); 264 assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]); 265 assert_eq!(buffer.dequeue(), Err(Empty)); 266 } 267 268 #[test] test_peek()269 fn test_peek() { 270 let mut buffer = buffer(); 271 assert_eq!(buffer.peek(), Err(Empty)); 272 buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef"); 273 assert_eq!(buffer.metadata_ring.len(), 1); 274 assert_eq!(buffer.peek().unwrap().1, &b"abcdef"[..]); 275 assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]); 276 assert_eq!(buffer.peek(), Err(Empty)); 277 } 278 279 #[test] test_padding()280 fn test_padding() { 281 let mut buffer = buffer(); 282 assert!(buffer.enqueue(6, ()).is_ok()); 283 assert!(buffer.enqueue(8, ()).is_ok()); 284 assert!(buffer.dequeue().is_ok()); 285 buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd"); 286 assert_eq!(buffer.metadata_ring.len(), 3); 287 assert!(buffer.dequeue().is_ok()); 288 289 assert_eq!(buffer.dequeue().unwrap().1, &b"abcd"[..]); 290 assert_eq!(buffer.metadata_ring.len(), 0); 291 } 292 293 #[test] test_padding_with_large_payload()294 fn test_padding_with_large_payload() { 295 let mut buffer = buffer(); 296 assert!(buffer.enqueue(12, ()).is_ok()); 297 assert!(buffer.dequeue().is_ok()); 298 buffer 299 .enqueue(12, ()) 300 .unwrap() 301 .copy_from_slice(b"abcdefghijkl"); 302 } 303 304 #[test] test_dequeue_with()305 fn test_dequeue_with() { 306 let mut buffer = buffer(); 307 assert!(buffer.enqueue(6, ()).is_ok()); 308 assert!(buffer.enqueue(8, ()).is_ok()); 309 assert!(buffer.dequeue().is_ok()); 310 buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd"); 311 assert_eq!(buffer.metadata_ring.len(), 3); 312 assert!(buffer.dequeue().is_ok()); 313 314 assert!(matches!( 315 buffer.dequeue_with(|_, _| Result::<(), u32>::Err(123)), 316 Ok(Err(_)) 317 )); 318 assert_eq!(buffer.metadata_ring.len(), 1); 319 320 assert!(buffer 321 .dequeue_with(|&mut (), payload| { 322 assert_eq!(payload, &b"abcd"[..]); 323 Result::<(), ()>::Ok(()) 324 }) 325 .is_ok()); 326 assert_eq!(buffer.metadata_ring.len(), 0); 327 } 328 329 #[test] test_metadata_full_empty()330 fn test_metadata_full_empty() { 331 let mut buffer = buffer(); 332 assert!(buffer.is_empty()); 333 assert!(!buffer.is_full()); 334 assert!(buffer.enqueue(1, ()).is_ok()); 335 assert!(!buffer.is_empty()); 336 assert!(buffer.enqueue(1, ()).is_ok()); 337 assert!(buffer.enqueue(1, ()).is_ok()); 338 assert!(!buffer.is_full()); 339 assert!(!buffer.is_empty()); 340 assert!(buffer.enqueue(1, ()).is_ok()); 341 assert!(buffer.is_full()); 342 assert!(!buffer.is_empty()); 343 assert_eq!(buffer.metadata_ring.len(), 4); 344 assert_eq!(buffer.enqueue(1, ()), Err(Full)); 345 } 346 347 #[test] test_window_too_small()348 fn test_window_too_small() { 349 let mut buffer = buffer(); 350 assert!(buffer.enqueue(4, ()).is_ok()); 351 assert!(buffer.enqueue(8, ()).is_ok()); 352 assert!(buffer.dequeue().is_ok()); 353 assert_eq!(buffer.enqueue(16, ()), Err(Full)); 354 assert_eq!(buffer.metadata_ring.len(), 1); 355 } 356 357 #[test] test_contiguous_window_too_small()358 fn test_contiguous_window_too_small() { 359 let mut buffer = buffer(); 360 assert!(buffer.enqueue(4, ()).is_ok()); 361 assert!(buffer.enqueue(8, ()).is_ok()); 362 assert!(buffer.dequeue().is_ok()); 363 assert_eq!(buffer.enqueue(8, ()), Err(Full)); 364 assert_eq!(buffer.metadata_ring.len(), 1); 365 } 366 367 #[test] test_contiguous_window_wrap()368 fn test_contiguous_window_wrap() { 369 let mut buffer = buffer(); 370 assert!(buffer.enqueue(15, ()).is_ok()); 371 assert!(buffer.dequeue().is_ok()); 372 assert!(buffer.enqueue(16, ()).is_ok()); 373 } 374 375 #[test] test_capacity_too_small()376 fn test_capacity_too_small() { 377 let mut buffer = buffer(); 378 assert_eq!(buffer.enqueue(32, ()), Err(Full)); 379 } 380 381 #[test] test_contig_window_prioritized()382 fn test_contig_window_prioritized() { 383 let mut buffer = buffer(); 384 assert!(buffer.enqueue(4, ()).is_ok()); 385 assert!(buffer.dequeue().is_ok()); 386 assert!(buffer.enqueue(5, ()).is_ok()); 387 } 388 389 #[test] clear()390 fn clear() { 391 let mut buffer = buffer(); 392 393 // Ensure enqueuing data in teh buffer fills it somewhat. 394 assert!(buffer.is_empty()); 395 assert!(buffer.enqueue(6, ()).is_ok()); 396 397 // Ensure that resetting the buffer causes it to be empty. 398 assert!(!buffer.is_empty()); 399 buffer.reset(); 400 assert!(buffer.is_empty()); 401 } 402 } 403