1 use managed::ManagedSlice;
2 
3 use crate::storage::{Full, RingBuffer};
4 
5 use super::Empty;
6 
7 /// Size and header of a packet.
8 #[derive(Debug, Clone, Copy)]
9 #[cfg_attr(feature = "defmt", derive(defmt::Format))]
10 pub struct PacketMetadata<H> {
11     size: usize,
12     header: Option<H>,
13 }
14 
15 impl<H> PacketMetadata<H> {
16     /// Empty packet description.
17     pub const EMPTY: PacketMetadata<H> = PacketMetadata {
18         size: 0,
19         header: None,
20     };
21 
padding(size: usize) -> PacketMetadata<H>22     fn padding(size: usize) -> PacketMetadata<H> {
23         PacketMetadata {
24             size: size,
25             header: None,
26         }
27     }
28 
packet(size: usize, header: H) -> PacketMetadata<H>29     fn packet(size: usize, header: H) -> PacketMetadata<H> {
30         PacketMetadata {
31             size: size,
32             header: Some(header),
33         }
34     }
35 
is_padding(&self) -> bool36     fn is_padding(&self) -> bool {
37         self.header.is_none()
38     }
39 }
40 
41 /// An UDP packet ring buffer.
42 #[derive(Debug)]
43 pub struct PacketBuffer<'a, H: 'a> {
44     metadata_ring: RingBuffer<'a, PacketMetadata<H>>,
45     payload_ring: RingBuffer<'a, u8>,
46 }
47 
48 impl<'a, H> PacketBuffer<'a, H> {
49     /// Create a new packet buffer with the provided metadata and payload storage.
50     ///
51     /// Metadata storage limits the maximum _number_ of packets in the buffer and payload
52     /// storage limits the maximum _total size_ of packets.
new<MS, PS>(metadata_storage: MS, payload_storage: PS) -> PacketBuffer<'a, H> where MS: Into<ManagedSlice<'a, PacketMetadata<H>>>, PS: Into<ManagedSlice<'a, u8>>,53     pub fn new<MS, PS>(metadata_storage: MS, payload_storage: PS) -> PacketBuffer<'a, H>
54     where
55         MS: Into<ManagedSlice<'a, PacketMetadata<H>>>,
56         PS: Into<ManagedSlice<'a, u8>>,
57     {
58         PacketBuffer {
59             metadata_ring: RingBuffer::new(metadata_storage),
60             payload_ring: RingBuffer::new(payload_storage),
61         }
62     }
63 
64     /// Query whether the buffer is empty.
is_empty(&self) -> bool65     pub fn is_empty(&self) -> bool {
66         self.metadata_ring.is_empty()
67     }
68 
69     /// Query whether the buffer is full.
is_full(&self) -> bool70     pub fn is_full(&self) -> bool {
71         self.metadata_ring.is_full()
72     }
73 
74     // There is currently no enqueue_with() because of the complexity of managing padding
75     // in case of failure.
76 
77     /// Enqueue a single packet with the given header into the buffer, and
78     /// return a reference to its payload, or return `Err(Full)`
79     /// if the buffer is full.
enqueue(&mut self, size: usize, header: H) -> Result<&mut [u8], Full>80     pub fn enqueue(&mut self, size: usize, header: H) -> Result<&mut [u8], Full> {
81         if self.payload_ring.capacity() < size || self.metadata_ring.is_full() {
82             return Err(Full);
83         }
84 
85         // Ring is currently empty.  Clear it (resetting `read_at`) to maximize
86         // for contiguous space.
87         if self.payload_ring.is_empty() {
88             self.payload_ring.clear();
89         }
90 
91         let window = self.payload_ring.window();
92         let contig_window = self.payload_ring.contiguous_window();
93 
94         if window < size {
95             return Err(Full);
96         } else if contig_window < size {
97             if window - contig_window < size {
98                 // The buffer length is larger than the current contiguous window
99                 // and is larger than the contiguous window will be after adding
100                 // the padding necessary to circle around to the beginning of the
101                 // ring buffer.
102                 return Err(Full);
103             } else {
104                 // Add padding to the end of the ring buffer so that the
105                 // contiguous window is at the beginning of the ring buffer.
106                 *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window);
107                 // note(discard): function does not write to the result
108                 // enqueued padding buffer location
109                 let _buf_enqueued = self.payload_ring.enqueue_many(contig_window);
110             }
111         }
112 
113         *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header);
114 
115         let payload_buf = self.payload_ring.enqueue_many(size);
116         debug_assert!(payload_buf.len() == size);
117         Ok(payload_buf)
118     }
119 
120     /// Call `f` with a packet from the buffer large enough to fit `max_size` bytes. The packet
121     /// is shrunk to the size returned from `f` and enqueued into the buffer.
enqueue_with_infallible<'b, F>( &'b mut self, max_size: usize, header: H, f: F, ) -> Result<usize, Full> where F: FnOnce(&'b mut [u8]) -> usize,122     pub fn enqueue_with_infallible<'b, F>(
123         &'b mut self,
124         max_size: usize,
125         header: H,
126         f: F,
127     ) -> Result<usize, Full>
128     where
129         F: FnOnce(&'b mut [u8]) -> usize,
130     {
131         if self.payload_ring.capacity() < max_size || self.metadata_ring.is_full() {
132             return Err(Full);
133         }
134 
135         let window = self.payload_ring.window();
136         let contig_window = self.payload_ring.contiguous_window();
137 
138         if window < max_size {
139             return Err(Full);
140         } else if contig_window < max_size {
141             if window - contig_window < max_size {
142                 // The buffer length is larger than the current contiguous window
143                 // and is larger than the contiguous window will be after adding
144                 // the padding necessary to circle around to the beginning of the
145                 // ring buffer.
146                 return Err(Full);
147             } else {
148                 // Add padding to the end of the ring buffer so that the
149                 // contiguous window is at the beginning of the ring buffer.
150                 *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window);
151                 // note(discard): function does not write to the result
152                 // enqueued padding buffer location
153                 let _buf_enqueued = self.payload_ring.enqueue_many(contig_window);
154             }
155         }
156 
157         let (size, _) = self
158             .payload_ring
159             .enqueue_many_with(|data| (f(&mut data[..max_size]), ()));
160 
161         *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header);
162 
163         Ok(size)
164     }
165 
dequeue_padding(&mut self)166     fn dequeue_padding(&mut self) {
167         let _ = self.metadata_ring.dequeue_one_with(|metadata| {
168             if metadata.is_padding() {
169                 // note(discard): function does not use value of dequeued padding bytes
170                 let _buf_dequeued = self.payload_ring.dequeue_many(metadata.size);
171                 Ok(()) // dequeue metadata
172             } else {
173                 Err(()) // don't dequeue metadata
174             }
175         });
176     }
177 
178     /// Call `f` with a single packet from the buffer, and dequeue the packet if `f`
179     /// returns successfully, or return `Err(EmptyError)` if the buffer is empty.
dequeue_with<'c, R, E, F>(&'c mut self, f: F) -> Result<Result<R, E>, Empty> where F: FnOnce(&mut H, &'c mut [u8]) -> Result<R, E>,180     pub fn dequeue_with<'c, R, E, F>(&'c mut self, f: F) -> Result<Result<R, E>, Empty>
181     where
182         F: FnOnce(&mut H, &'c mut [u8]) -> Result<R, E>,
183     {
184         self.dequeue_padding();
185 
186         self.metadata_ring.dequeue_one_with(|metadata| {
187             self.payload_ring
188                 .dequeue_many_with(|payload_buf| {
189                     debug_assert!(payload_buf.len() >= metadata.size);
190 
191                     match f(
192                         metadata.header.as_mut().unwrap(),
193                         &mut payload_buf[..metadata.size],
194                     ) {
195                         Ok(val) => (metadata.size, Ok(val)),
196                         Err(err) => (0, Err(err)),
197                     }
198                 })
199                 .1
200         })
201     }
202 
203     /// Dequeue a single packet from the buffer, and return a reference to its payload
204     /// as well as its header, or return `Err(Error::Exhausted)` if the buffer is empty.
dequeue(&mut self) -> Result<(H, &mut [u8]), Empty>205     pub fn dequeue(&mut self) -> Result<(H, &mut [u8]), Empty> {
206         self.dequeue_padding();
207 
208         let meta = self.metadata_ring.dequeue_one()?;
209 
210         let payload_buf = self.payload_ring.dequeue_many(meta.size);
211         debug_assert!(payload_buf.len() == meta.size);
212         Ok((meta.header.take().unwrap(), payload_buf))
213     }
214 
215     /// Peek at a single packet from the buffer without removing it, and return a reference to
216     /// its payload as well as its header, or return `Err(Error:Exhausted)` if the buffer is empty.
217     ///
218     /// This function otherwise behaves identically to [dequeue](#method.dequeue).
peek(&mut self) -> Result<(&H, &[u8]), Empty>219     pub fn peek(&mut self) -> Result<(&H, &[u8]), Empty> {
220         self.dequeue_padding();
221 
222         if let Some(metadata) = self.metadata_ring.get_allocated(0, 1).first() {
223             Ok((
224                 metadata.header.as_ref().unwrap(),
225                 self.payload_ring.get_allocated(0, metadata.size),
226             ))
227         } else {
228             Err(Empty)
229         }
230     }
231 
232     /// Return the maximum number packets that can be stored.
packet_capacity(&self) -> usize233     pub fn packet_capacity(&self) -> usize {
234         self.metadata_ring.capacity()
235     }
236 
237     /// Return the maximum number of bytes in the payload ring buffer.
payload_capacity(&self) -> usize238     pub fn payload_capacity(&self) -> usize {
239         self.payload_ring.capacity()
240     }
241 
242     /// Reset the packet buffer and clear any staged.
243     #[allow(unused)]
reset(&mut self)244     pub(crate) fn reset(&mut self) {
245         self.payload_ring.clear();
246         self.metadata_ring.clear();
247     }
248 }
249 
250 #[cfg(test)]
251 mod test {
252     use super::*;
253 
buffer() -> PacketBuffer<'static, ()>254     fn buffer() -> PacketBuffer<'static, ()> {
255         PacketBuffer::new(vec![PacketMetadata::EMPTY; 4], vec![0u8; 16])
256     }
257 
258     #[test]
test_simple()259     fn test_simple() {
260         let mut buffer = buffer();
261         buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
262         assert_eq!(buffer.enqueue(16, ()), Err(Full));
263         assert_eq!(buffer.metadata_ring.len(), 1);
264         assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
265         assert_eq!(buffer.dequeue(), Err(Empty));
266     }
267 
268     #[test]
test_peek()269     fn test_peek() {
270         let mut buffer = buffer();
271         assert_eq!(buffer.peek(), Err(Empty));
272         buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
273         assert_eq!(buffer.metadata_ring.len(), 1);
274         assert_eq!(buffer.peek().unwrap().1, &b"abcdef"[..]);
275         assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
276         assert_eq!(buffer.peek(), Err(Empty));
277     }
278 
279     #[test]
test_padding()280     fn test_padding() {
281         let mut buffer = buffer();
282         assert!(buffer.enqueue(6, ()).is_ok());
283         assert!(buffer.enqueue(8, ()).is_ok());
284         assert!(buffer.dequeue().is_ok());
285         buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
286         assert_eq!(buffer.metadata_ring.len(), 3);
287         assert!(buffer.dequeue().is_ok());
288 
289         assert_eq!(buffer.dequeue().unwrap().1, &b"abcd"[..]);
290         assert_eq!(buffer.metadata_ring.len(), 0);
291     }
292 
293     #[test]
test_padding_with_large_payload()294     fn test_padding_with_large_payload() {
295         let mut buffer = buffer();
296         assert!(buffer.enqueue(12, ()).is_ok());
297         assert!(buffer.dequeue().is_ok());
298         buffer
299             .enqueue(12, ())
300             .unwrap()
301             .copy_from_slice(b"abcdefghijkl");
302     }
303 
304     #[test]
test_dequeue_with()305     fn test_dequeue_with() {
306         let mut buffer = buffer();
307         assert!(buffer.enqueue(6, ()).is_ok());
308         assert!(buffer.enqueue(8, ()).is_ok());
309         assert!(buffer.dequeue().is_ok());
310         buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
311         assert_eq!(buffer.metadata_ring.len(), 3);
312         assert!(buffer.dequeue().is_ok());
313 
314         assert!(matches!(
315             buffer.dequeue_with(|_, _| Result::<(), u32>::Err(123)),
316             Ok(Err(_))
317         ));
318         assert_eq!(buffer.metadata_ring.len(), 1);
319 
320         assert!(buffer
321             .dequeue_with(|&mut (), payload| {
322                 assert_eq!(payload, &b"abcd"[..]);
323                 Result::<(), ()>::Ok(())
324             })
325             .is_ok());
326         assert_eq!(buffer.metadata_ring.len(), 0);
327     }
328 
329     #[test]
test_metadata_full_empty()330     fn test_metadata_full_empty() {
331         let mut buffer = buffer();
332         assert!(buffer.is_empty());
333         assert!(!buffer.is_full());
334         assert!(buffer.enqueue(1, ()).is_ok());
335         assert!(!buffer.is_empty());
336         assert!(buffer.enqueue(1, ()).is_ok());
337         assert!(buffer.enqueue(1, ()).is_ok());
338         assert!(!buffer.is_full());
339         assert!(!buffer.is_empty());
340         assert!(buffer.enqueue(1, ()).is_ok());
341         assert!(buffer.is_full());
342         assert!(!buffer.is_empty());
343         assert_eq!(buffer.metadata_ring.len(), 4);
344         assert_eq!(buffer.enqueue(1, ()), Err(Full));
345     }
346 
347     #[test]
test_window_too_small()348     fn test_window_too_small() {
349         let mut buffer = buffer();
350         assert!(buffer.enqueue(4, ()).is_ok());
351         assert!(buffer.enqueue(8, ()).is_ok());
352         assert!(buffer.dequeue().is_ok());
353         assert_eq!(buffer.enqueue(16, ()), Err(Full));
354         assert_eq!(buffer.metadata_ring.len(), 1);
355     }
356 
357     #[test]
test_contiguous_window_too_small()358     fn test_contiguous_window_too_small() {
359         let mut buffer = buffer();
360         assert!(buffer.enqueue(4, ()).is_ok());
361         assert!(buffer.enqueue(8, ()).is_ok());
362         assert!(buffer.dequeue().is_ok());
363         assert_eq!(buffer.enqueue(8, ()), Err(Full));
364         assert_eq!(buffer.metadata_ring.len(), 1);
365     }
366 
367     #[test]
test_contiguous_window_wrap()368     fn test_contiguous_window_wrap() {
369         let mut buffer = buffer();
370         assert!(buffer.enqueue(15, ()).is_ok());
371         assert!(buffer.dequeue().is_ok());
372         assert!(buffer.enqueue(16, ()).is_ok());
373     }
374 
375     #[test]
test_capacity_too_small()376     fn test_capacity_too_small() {
377         let mut buffer = buffer();
378         assert_eq!(buffer.enqueue(32, ()), Err(Full));
379     }
380 
381     #[test]
test_contig_window_prioritized()382     fn test_contig_window_prioritized() {
383         let mut buffer = buffer();
384         assert!(buffer.enqueue(4, ()).is_ok());
385         assert!(buffer.dequeue().is_ok());
386         assert!(buffer.enqueue(5, ()).is_ok());
387     }
388 
389     #[test]
clear()390     fn clear() {
391         let mut buffer = buffer();
392 
393         // Ensure enqueuing data in teh buffer fills it somewhat.
394         assert!(buffer.is_empty());
395         assert!(buffer.enqueue(6, ()).is_ok());
396 
397         // Ensure that resetting the buffer causes it to be empty.
398         assert!(!buffer.is_empty());
399         buffer.reset();
400         assert!(buffer.is_empty());
401     }
402 }
403