smoltcp/storage/
packet_buffer.rs

1use managed::ManagedSlice;
2
3use crate::storage::{Full, RingBuffer};
4
5use super::Empty;
6
7/// Size and header of a packet.
8#[derive(Debug, Clone, Copy)]
9#[cfg_attr(feature = "defmt", derive(defmt::Format))]
10pub struct PacketMetadata<H> {
11    size: usize,
12    header: Option<H>,
13}
14
15impl<H> PacketMetadata<H> {
16    /// Empty packet description.
17    pub const EMPTY: PacketMetadata<H> = PacketMetadata {
18        size: 0,
19        header: None,
20    };
21
22    fn padding(size: usize) -> PacketMetadata<H> {
23        PacketMetadata {
24            size: size,
25            header: None,
26        }
27    }
28
29    fn packet(size: usize, header: H) -> PacketMetadata<H> {
30        PacketMetadata {
31            size: size,
32            header: Some(header),
33        }
34    }
35
36    fn is_padding(&self) -> bool {
37        self.header.is_none()
38    }
39}
40
41/// An UDP packet ring buffer.
42#[derive(Debug)]
43pub struct PacketBuffer<'a, H: 'a> {
44    metadata_ring: RingBuffer<'a, PacketMetadata<H>>,
45    payload_ring: RingBuffer<'a, u8>,
46}
47
48impl<'a, H> PacketBuffer<'a, H> {
49    /// Create a new packet buffer with the provided metadata and payload storage.
50    ///
51    /// Metadata storage limits the maximum _number_ of packets in the buffer and payload
52    /// storage limits the maximum _total size_ of packets.
53    pub fn new<MS, PS>(metadata_storage: MS, payload_storage: PS) -> PacketBuffer<'a, H>
54    where
55        MS: Into<ManagedSlice<'a, PacketMetadata<H>>>,
56        PS: Into<ManagedSlice<'a, u8>>,
57    {
58        PacketBuffer {
59            metadata_ring: RingBuffer::new(metadata_storage),
60            payload_ring: RingBuffer::new(payload_storage),
61        }
62    }
63
64    /// Query whether the buffer is empty.
65    pub fn is_empty(&self) -> bool {
66        self.metadata_ring.is_empty()
67    }
68
69    /// Query whether the buffer is full.
70    pub fn is_full(&self) -> bool {
71        self.metadata_ring.is_full()
72    }
73
74    // There is currently no enqueue_with() because of the complexity of managing padding
75    // in case of failure.
76
77    /// Enqueue a single packet with the given header into the buffer, and
78    /// return a reference to its payload, or return `Err(Full)`
79    /// if the buffer is full.
80    pub fn enqueue(&mut self, size: usize, header: H) -> Result<&mut [u8], Full> {
81        if self.payload_ring.capacity() < size || self.metadata_ring.is_full() {
82            return Err(Full);
83        }
84
85        // Ring is currently empty.  Clear it (resetting `read_at`) to maximize
86        // for contiguous space.
87        if self.payload_ring.is_empty() {
88            self.payload_ring.clear();
89        }
90
91        let window = self.payload_ring.window();
92        let contig_window = self.payload_ring.contiguous_window();
93
94        if window < size {
95            return Err(Full);
96        } else if contig_window < size {
97            if window - contig_window < size {
98                // The buffer length is larger than the current contiguous window
99                // and is larger than the contiguous window will be after adding
100                // the padding necessary to circle around to the beginning of the
101                // ring buffer.
102                return Err(Full);
103            } else {
104                // Add padding to the end of the ring buffer so that the
105                // contiguous window is at the beginning of the ring buffer.
106                *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window);
107                // note(discard): function does not write to the result
108                // enqueued padding buffer location
109                let _buf_enqueued = self.payload_ring.enqueue_many(contig_window);
110            }
111        }
112
113        *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header);
114
115        let payload_buf = self.payload_ring.enqueue_many(size);
116        debug_assert!(payload_buf.len() == size);
117        Ok(payload_buf)
118    }
119
120    /// Call `f` with a packet from the buffer large enough to fit `max_size` bytes. The packet
121    /// is shrunk to the size returned from `f` and enqueued into the buffer.
122    pub fn enqueue_with_infallible<'b, F>(
123        &'b mut self,
124        max_size: usize,
125        header: H,
126        f: F,
127    ) -> Result<usize, Full>
128    where
129        F: FnOnce(&'b mut [u8]) -> usize,
130    {
131        if self.payload_ring.capacity() < max_size || self.metadata_ring.is_full() {
132            return Err(Full);
133        }
134
135        let window = self.payload_ring.window();
136        let contig_window = self.payload_ring.contiguous_window();
137
138        if window < max_size {
139            return Err(Full);
140        } else if contig_window < max_size {
141            if window - contig_window < max_size {
142                // The buffer length is larger than the current contiguous window
143                // and is larger than the contiguous window will be after adding
144                // the padding necessary to circle around to the beginning of the
145                // ring buffer.
146                return Err(Full);
147            } else {
148                // Add padding to the end of the ring buffer so that the
149                // contiguous window is at the beginning of the ring buffer.
150                *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window);
151                // note(discard): function does not write to the result
152                // enqueued padding buffer location
153                let _buf_enqueued = self.payload_ring.enqueue_many(contig_window);
154            }
155        }
156
157        let (size, _) = self
158            .payload_ring
159            .enqueue_many_with(|data| (f(&mut data[..max_size]), ()));
160
161        *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header);
162
163        Ok(size)
164    }
165
166    fn dequeue_padding(&mut self) {
167        let _ = self.metadata_ring.dequeue_one_with(|metadata| {
168            if metadata.is_padding() {
169                // note(discard): function does not use value of dequeued padding bytes
170                let _buf_dequeued = self.payload_ring.dequeue_many(metadata.size);
171                Ok(()) // dequeue metadata
172            } else {
173                Err(()) // don't dequeue metadata
174            }
175        });
176    }
177
178    /// Call `f` with a single packet from the buffer, and dequeue the packet if `f`
179    /// returns successfully, or return `Err(EmptyError)` if the buffer is empty.
180    pub fn dequeue_with<'c, R, E, F>(&'c mut self, f: F) -> Result<Result<R, E>, Empty>
181    where
182        F: FnOnce(&mut H, &'c mut [u8]) -> Result<R, E>,
183    {
184        self.dequeue_padding();
185
186        self.metadata_ring.dequeue_one_with(|metadata| {
187            self.payload_ring
188                .dequeue_many_with(|payload_buf| {
189                    debug_assert!(payload_buf.len() >= metadata.size);
190
191                    match f(
192                        metadata.header.as_mut().unwrap(),
193                        &mut payload_buf[..metadata.size],
194                    ) {
195                        Ok(val) => (metadata.size, Ok(val)),
196                        Err(err) => (0, Err(err)),
197                    }
198                })
199                .1
200        })
201    }
202
203    /// Dequeue a single packet from the buffer, and return a reference to its payload
204    /// as well as its header, or return `Err(Error::Exhausted)` if the buffer is empty.
205    pub fn dequeue(&mut self) -> Result<(H, &mut [u8]), Empty> {
206        self.dequeue_padding();
207
208        let meta = self.metadata_ring.dequeue_one()?;
209
210        let payload_buf = self.payload_ring.dequeue_many(meta.size);
211        debug_assert!(payload_buf.len() == meta.size);
212        Ok((meta.header.take().unwrap(), payload_buf))
213    }
214
215    /// Peek at a single packet from the buffer without removing it, and return a reference to
216    /// its payload as well as its header, or return `Err(Error:Exhausted)` if the buffer is empty.
217    ///
218    /// This function otherwise behaves identically to [dequeue](#method.dequeue).
219    pub fn peek(&mut self) -> Result<(&H, &[u8]), Empty> {
220        self.dequeue_padding();
221
222        if let Some(metadata) = self.metadata_ring.get_allocated(0, 1).first() {
223            Ok((
224                metadata.header.as_ref().unwrap(),
225                self.payload_ring.get_allocated(0, metadata.size),
226            ))
227        } else {
228            Err(Empty)
229        }
230    }
231
232    /// Return the maximum number packets that can be stored.
233    pub fn packet_capacity(&self) -> usize {
234        self.metadata_ring.capacity()
235    }
236
237    /// Return the maximum number of bytes in the payload ring buffer.
238    pub fn payload_capacity(&self) -> usize {
239        self.payload_ring.capacity()
240    }
241
242    /// Return the current number of bytes in the payload ring buffer.
243    pub fn payload_bytes_count(&self) -> usize {
244        self.payload_ring.len()
245    }
246
247    /// Reset the packet buffer and clear any staged.
248    #[allow(unused)]
249    pub(crate) fn reset(&mut self) {
250        self.payload_ring.clear();
251        self.metadata_ring.clear();
252    }
253}
254
255#[cfg(test)]
256mod test {
257    use super::*;
258
259    fn buffer() -> PacketBuffer<'static, ()> {
260        PacketBuffer::new(vec![PacketMetadata::EMPTY; 4], vec![0u8; 16])
261    }
262
263    #[test]
264    fn test_simple() {
265        let mut buffer = buffer();
266        buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
267        assert_eq!(buffer.enqueue(16, ()), Err(Full));
268        assert_eq!(buffer.metadata_ring.len(), 1);
269        assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
270        assert_eq!(buffer.dequeue(), Err(Empty));
271    }
272
273    #[test]
274    fn test_peek() {
275        let mut buffer = buffer();
276        assert_eq!(buffer.peek(), Err(Empty));
277        buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
278        assert_eq!(buffer.metadata_ring.len(), 1);
279        assert_eq!(buffer.peek().unwrap().1, &b"abcdef"[..]);
280        assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
281        assert_eq!(buffer.peek(), Err(Empty));
282    }
283
284    #[test]
285    fn test_padding() {
286        let mut buffer = buffer();
287        assert!(buffer.enqueue(6, ()).is_ok());
288        assert!(buffer.enqueue(8, ()).is_ok());
289        assert!(buffer.dequeue().is_ok());
290        buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
291        assert_eq!(buffer.metadata_ring.len(), 3);
292        assert!(buffer.dequeue().is_ok());
293
294        assert_eq!(buffer.dequeue().unwrap().1, &b"abcd"[..]);
295        assert_eq!(buffer.metadata_ring.len(), 0);
296    }
297
298    #[test]
299    fn test_padding_with_large_payload() {
300        let mut buffer = buffer();
301        assert!(buffer.enqueue(12, ()).is_ok());
302        assert!(buffer.dequeue().is_ok());
303        buffer
304            .enqueue(12, ())
305            .unwrap()
306            .copy_from_slice(b"abcdefghijkl");
307    }
308
309    #[test]
310    fn test_dequeue_with() {
311        let mut buffer = buffer();
312        assert!(buffer.enqueue(6, ()).is_ok());
313        assert!(buffer.enqueue(8, ()).is_ok());
314        assert!(buffer.dequeue().is_ok());
315        buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
316        assert_eq!(buffer.metadata_ring.len(), 3);
317        assert!(buffer.dequeue().is_ok());
318
319        assert!(matches!(
320            buffer.dequeue_with(|_, _| Result::<(), u32>::Err(123)),
321            Ok(Err(_))
322        ));
323        assert_eq!(buffer.metadata_ring.len(), 1);
324
325        assert!(buffer
326            .dequeue_with(|&mut (), payload| {
327                assert_eq!(payload, &b"abcd"[..]);
328                Result::<(), ()>::Ok(())
329            })
330            .is_ok());
331        assert_eq!(buffer.metadata_ring.len(), 0);
332    }
333
334    #[test]
335    fn test_metadata_full_empty() {
336        let mut buffer = buffer();
337        assert!(buffer.is_empty());
338        assert!(!buffer.is_full());
339        assert!(buffer.enqueue(1, ()).is_ok());
340        assert!(!buffer.is_empty());
341        assert!(buffer.enqueue(1, ()).is_ok());
342        assert!(buffer.enqueue(1, ()).is_ok());
343        assert!(!buffer.is_full());
344        assert!(!buffer.is_empty());
345        assert!(buffer.enqueue(1, ()).is_ok());
346        assert!(buffer.is_full());
347        assert!(!buffer.is_empty());
348        assert_eq!(buffer.metadata_ring.len(), 4);
349        assert_eq!(buffer.enqueue(1, ()), Err(Full));
350    }
351
352    #[test]
353    fn test_window_too_small() {
354        let mut buffer = buffer();
355        assert!(buffer.enqueue(4, ()).is_ok());
356        assert!(buffer.enqueue(8, ()).is_ok());
357        assert!(buffer.dequeue().is_ok());
358        assert_eq!(buffer.enqueue(16, ()), Err(Full));
359        assert_eq!(buffer.metadata_ring.len(), 1);
360    }
361
362    #[test]
363    fn test_contiguous_window_too_small() {
364        let mut buffer = buffer();
365        assert!(buffer.enqueue(4, ()).is_ok());
366        assert!(buffer.enqueue(8, ()).is_ok());
367        assert!(buffer.dequeue().is_ok());
368        assert_eq!(buffer.enqueue(8, ()), Err(Full));
369        assert_eq!(buffer.metadata_ring.len(), 1);
370    }
371
372    #[test]
373    fn test_contiguous_window_wrap() {
374        let mut buffer = buffer();
375        assert!(buffer.enqueue(15, ()).is_ok());
376        assert!(buffer.dequeue().is_ok());
377        assert!(buffer.enqueue(16, ()).is_ok());
378    }
379
380    #[test]
381    fn test_capacity_too_small() {
382        let mut buffer = buffer();
383        assert_eq!(buffer.enqueue(32, ()), Err(Full));
384    }
385
386    #[test]
387    fn test_contig_window_prioritized() {
388        let mut buffer = buffer();
389        assert!(buffer.enqueue(4, ()).is_ok());
390        assert!(buffer.dequeue().is_ok());
391        assert!(buffer.enqueue(5, ()).is_ok());
392    }
393
394    #[test]
395    fn clear() {
396        let mut buffer = buffer();
397
398        // Ensure enqueuing data in the buffer fills it somewhat.
399        assert!(buffer.is_empty());
400        assert!(buffer.enqueue(6, ()).is_ok());
401
402        // Ensure that resetting the buffer causes it to be empty.
403        assert!(!buffer.is_empty());
404        buffer.reset();
405        assert!(buffer.is_empty());
406    }
407}