kernel/utilities/
streaming_process_slice.rs

1// Licensed under the Apache License, Version 2.0 or the MIT License.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3// Copyright Tock Contributors 2024.
4
5//! Module containing the [`StreamingProcessSlice`] abstraction and
6//! related infrastructure. See the documentation on
7//! [`StreamingProcessSlice`] for more information.
8
9use core::ops::{Range, RangeFrom};
10
11use crate::processbuffer::WriteableProcessSlice;
12use crate::utilities::registers::{register_bitfields, LocalRegisterCopy};
13use crate::ErrorCode;
14
15/// A wrapper around a [`WriteableProcessSlice`] for streaming data from the
16/// kernel to a userspace process.
17///
18/// Applications like ADC sampling or network stacks require the kernel to
19/// provide a process with a continuous, lossless stream of data from a source
20/// that is not rate-controlled by the process. This wrapper implements the
21/// kernel-side of a simple protocol to achieve this goal, without requiring
22/// kernel-side buffering and by utilizing the atomic swap semantics of Tock's
23/// `allow` system call. The protocol is versioned; the semantics for version 0
24/// are as follows:
25///
26/// 1. To receive a data stream from the kernel, a userspace process allocates
27///    two buffers.
28///
29/// 2. The first buffer is prepared according to the format below. The `flags`
30///    field's version bits are set to `0`. The process clears the `exceeded`
31///    flag. It may set or clear the `halt` flag. All reserved flags must be set
32///    to `0`. Finally, the `offset` bytes (interpreted as a u32 value in native
33///    endianness) are set to `0`.
34///
35/// 3. The process `allow`s this buffer to a kernel driver.
36///
37/// 4. The kernel driver writes incoming data starting at the `data` field +
38///    `offset` bytes. After each write, the kernel increments `offset` by the
39///    number of bytes written.
40///
41///    For each *chunk* written to the buffer (where a *chunk* is an
42///    application-defined construct, such as a network packet), the kernel only
43///    increments `offset` if the full chunk was successfully written into the
44///    buffer. The kernel may or may not modify any data after the current
45///    `offset` value, regardless of whether any header fields were updated. The
46///    kernel never modifies any data in the region of
47///    `[data.start; data.start + offset)`.
48///
49///    Should the write of a chunk fail because the buffer has insufficient
50///    space left, the kernel will set the `exceeded` flag bit (index 0).
51///
52///    The `halt` flag bit as set by the process governs the kernel's behavior
53///    once the `exceeded` flag is set: if `halt` is cleared, the kernel will
54///    attempt to write future, smaller chunks to the buffer (and thus implicitly
55///    discarding some packets). If `halt` and `exceeded` are both set, the
56///    kernel will stop writing any data into the buffer.
57///
58/// 5. The kernel will schedule an upcall to the process, indicating that a
59///    write to the buffer (or setting the `exceeded`) flag occurred. The kernel
60///    may schedule only one upcall for the first chunk written to the buffer,
61///    or multiple upcalls (e.g., one upcall per chunk written). A process must
62///    not rely on the number of upcalls received and instead rely on the buffer
63///    header (`offset` and the `flags` bits) to determine the amount of data
64///    written to the buffer.
65///
66/// 6. The process prepares its second buffer, following step 2. The process
67///    then issues an `allow` operation that atomically swaps the current
68///    allowed buffer by its second buffer.
69///
70/// 7. The process can now process the received chunks contained in the initial
71///    buffer, while the kernel receives new chunks in the other, newly allowed
72///    buffer.
73///
74/// As the kernel cannot track if an `allow`ed buffer for a particular
75/// [`SyscallDriver`](crate::syscall_driver::SyscallDriver) implementation is intended to be a
76/// [`StreamingProcessSlice`], the kernel must use the header in the buffer as
77/// provided by the process. The implementation of [`StreamingProcessSlice`]
78/// ensures that an incorrect header will not cause a panic, but incoming
79/// packets could be dropped. A process using a syscall API that uses a
80/// [`StreamingProcessSlice`] must ensure it has properly initialized the header
81/// before `allow`ing the buffer.
82///
83/// The version 0 buffer format is specified as follows:
84/// ```text,ignore
85/// 0           2           4           6           8
86/// +-----------+-----------+-----------------------+----------...
87/// | version   | flags     | write offset (32 bit) | data
88/// +-----------+-----------+-----------------------+----------...
89/// | 000...000 | x{14},H,E | <native endian u32>   |
90/// +-----------+-----------+-----------------------+----------...
91/// ```
92///
93/// The `version` field is a u16 integer stored in the target's native
94/// endianness. The `flags` field is a bitfield laid out as shown in the
95/// diagram above (laid out in big endian, with `E` being the least significant
96/// bit at byte 3). The `offset` field is a u32 integer stored in the target's
97/// native endianness.
98///
99/// The kernel does not impose any alignment restrictions on
100/// `StreamingProcessSlice`s of version 0.
101///
102/// The flags field is structured as follows:
103/// - `V`: version bits. This kernel only supports version `0`.
104/// - `H`: `halt` flag. If this flag is set and the `exceeded` flag is set, the
105///   kernel will not write any further data to this buffer.
106/// - `E`: `exceeded` flag. The kernel sets this flag when the remaining buffer
107///   capacity is insufficient to append the current chunk.
108/// - `x{14}`: reserved flag bits. Unless specified otherwise, processes must clear
109///   these flags prior to `allow`ing a buffer to the kernel. A kernel that does
110///   not know of a reserved flag must refuse to operate on a buffer that has
111///   such a flag set.
112#[repr(transparent)]
113pub struct StreamingProcessSlice<'a> {
114    slice: &'a WriteableProcessSlice,
115}
116
117register_bitfields![
118    u16,
119    pub StreamingProcessSliceFlags [
120        RESERVED OFFSET(2) NUMBITS(14) [
121            RESERVED0 = 0x00,
122        ],
123        HALT OFFSET(1) NUMBITS(1) [],
124        EXCEEDED OFFSET(0) NUMBITS(1) [],
125    ]
126];
127
128/// Fields in the `StreamingProcessSlice` buffer header.
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub struct StreamingProcessSliceHeader {
131    pub version: u16,
132    pub halt: bool,
133    pub exceeded: bool,
134    pub offset: u32,
135}
136
137impl<'a> StreamingProcessSlice<'a> {
138    const RANGE_VERSION: Range<usize> = 0..2;
139    const RANGE_FLAGS: Range<usize> = (Self::RANGE_VERSION.end)..(Self::RANGE_VERSION.end + 2);
140    const RANGE_OFFSET: Range<usize> = (Self::RANGE_FLAGS.end)..(Self::RANGE_FLAGS.end + 4);
141    const RANGE_DATA: RangeFrom<usize> = (Self::RANGE_OFFSET.end)..;
142
143    pub fn new(slice: &'a WriteableProcessSlice) -> StreamingProcessSlice<'a> {
144        StreamingProcessSlice { slice }
145    }
146
147    /// Checks whether the buffer is valid (of sufficient size to contain at
148    /// least the `flags` and `offset` fields), and extract the `flags` and
149    /// `offset` field values.
150    ///
151    /// This function fails with
152    /// - `INVAL`: if the version is not `0`, or the reserved flags are not
153    ///   cleared.
154    /// - `SIZE`: if the underlying slice is not large enough to fit the
155    ///   flags field and the offset field.
156    fn get_header(&self) -> Result<StreamingProcessSliceHeader, ErrorCode> {
157        let mut version_bytes = [0_u8; 2];
158        self.slice
159            .get(Self::RANGE_VERSION)
160            .ok_or(ErrorCode::SIZE)?
161            .copy_to_slice_or_err(&mut version_bytes)
162            .map_err(|_| ErrorCode::SIZE)?;
163
164        let version = u16::from_be_bytes(version_bytes);
165        if version != 0 {
166            return Err(ErrorCode::INVAL);
167        }
168
169        let mut flags_bytes = [0_u8; 2];
170        self.slice
171            .get(Self::RANGE_FLAGS)
172            .ok_or(ErrorCode::SIZE)?
173            .copy_to_slice_or_err(&mut flags_bytes)
174            .map_err(|_| ErrorCode::SIZE)?;
175
176        let flags: LocalRegisterCopy<u16, StreamingProcessSliceFlags::Register> =
177            LocalRegisterCopy::new(u16::from_be_bytes(flags_bytes));
178
179        if flags.read_as_enum(StreamingProcessSliceFlags::RESERVED)
180            != Some(StreamingProcessSliceFlags::RESERVED::Value::RESERVED0)
181        {
182            return Err(ErrorCode::INVAL);
183        }
184
185        let mut offset_bytes = [0_u8; Self::RANGE_OFFSET.end - Self::RANGE_OFFSET.start];
186        self.slice
187            .get(Self::RANGE_OFFSET)
188            .ok_or(ErrorCode::SIZE)?
189            .copy_to_slice_or_err(&mut offset_bytes)
190            .map_err(|_| ErrorCode::SIZE)?;
191
192        Ok(StreamingProcessSliceHeader {
193            version,
194            halt: flags.read(StreamingProcessSliceFlags::HALT) != 0,
195            exceeded: flags.read(StreamingProcessSliceFlags::EXCEEDED) != 0,
196            offset: u32::from_ne_bytes(offset_bytes),
197        })
198    }
199
200    /// Write updated header (`version`, `flags` and `offset`) back to the
201    /// underlying slice.
202    ///
203    /// This function does not perform any sanity checks on the `header`
204    /// argument.  In particular, users of this function must ensure that they
205    /// previously extracted the written-back [`StreamingProcessSliceHeader`]
206    /// argument from the buffer, do not modify the version, do not change any
207    /// flags that are controlled by the process or otherwise violate the
208    /// protocol, and correctly increment the `offset` value.
209    ///
210    /// - `SIZE`: if the underlying slice is not large enough to fit the
211    ///   flags field and the offset field.
212    fn write_header(&self, header: StreamingProcessSliceHeader) -> Result<(), ErrorCode> {
213        // Write the offset first, to avoid modifying the buffer if it's too
214        // small to fit the offset, but large enough to hold the flags byte:
215        let offset_bytes = u32::to_ne_bytes(header.offset);
216        self.slice
217            .get(Self::RANGE_OFFSET)
218            .ok_or(ErrorCode::SIZE)?
219            .copy_from_slice_or_err(&offset_bytes)
220            .map_err(|_| ErrorCode::SIZE)?;
221
222        let version_bytes: [u8; 2] = u16::to_ne_bytes(header.version);
223        self.slice
224            .get(Self::RANGE_VERSION)
225            .ok_or(ErrorCode::SIZE)?
226            .copy_from_slice_or_err(&version_bytes)
227            .map_err(|_| ErrorCode::SIZE)?;
228
229        let flags_bytes: [u8; 2] = u16::to_be_bytes(
230            (StreamingProcessSliceFlags::RESERVED::RESERVED0
231                + StreamingProcessSliceFlags::HALT.val(header.halt as u16)
232                + StreamingProcessSliceFlags::EXCEEDED.val(header.exceeded as u16))
233            .value,
234        );
235        self.slice
236            .get(Self::RANGE_FLAGS)
237            .ok_or(ErrorCode::SIZE)?
238            .copy_from_slice_or_err(&flags_bytes)
239            .map_err(|_| ErrorCode::SIZE)?;
240
241        Ok(())
242    }
243
244    /// Access the payload data portion of the underlying slice.
245    ///
246    /// This method does not perform any validation of the buffer version or
247    /// data. It must only be used on slices of version 0. If the underlying
248    /// slice is too small to hold the header fields, this will return an empty
249    /// slice.
250    fn payload_slice(&self) -> &WriteableProcessSlice {
251        self.slice
252            .get(Self::RANGE_DATA)
253            .unwrap_or((&mut [][..]).into())
254    }
255
256    /// Append a chunk of data to the slice.
257    ///
258    /// If the underlying slice has a correct `flags` and `offset` value, is not
259    /// halted, and has sufficient space for this `data` chunk, this function
260    /// returns the updated buffer offset (set to one past the last written
261    /// byte).
262    ///
263    /// This function returns whether this chunk was the first non-zero-length
264    /// `chunk` appended to the slice, and the offset after the append operation
265    /// (where the next chunk would be written in the data section).
266    ///
267    /// This function fails with:
268    /// - `INVAL`: if the version is not `0`, or the reserved flags are not
269    ///   cleared.
270    /// - `BUSY`: if both the `halt` and `exceeded` flags are set. In this case,
271    ///   the slice will not be modified.
272    /// - `SIZE`: if the underlying slice is not large enough to fit the
273    ///   flags field and the offset field. In this case, the
274    ///   `exceeded` flag will be set and the slice will not be modified.
275    /// - `FAIL`: would need to increment offset beyond 2**32 - 1. Neither the
276    ///   payload slice nor any header fields will be modified.
277    ///
278    /// Appending a zero-length `chunk` will be treated as every other chunk,
279    /// but appending it will not set the exceeded flag, even if `offset` is at
280    /// the maximum position for this buffer. A zero-length append operation can
281    /// still fail due to the buffer being halted, having an improper header,
282    /// etc. A zero-length `chunk` will never be treated as the first chunk
283    /// appended to a buffer.
284    pub fn append_chunk(&self, chunk: &[u8]) -> Result<(bool, u32), ErrorCode> {
285        // This includes general sanity checks:
286        let mut header = self.get_header()?;
287
288        // Check whether we are instructed to halt:
289        if header.exceeded && header.halt {
290            return Err(ErrorCode::BUSY);
291        }
292
293        let previous_offset = header.offset;
294
295        let new_offset: u32 = (previous_offset as usize)
296            .checked_add(chunk.len())
297            .ok_or(ErrorCode::FAIL)?
298            .try_into()
299            .map_err(|_| ErrorCode::FAIL)?;
300
301        // Attempt to append the chunk to the slice, otherwise fail with SIZE:
302        if let Some(dst) = self
303            .payload_slice()
304            .get((previous_offset as usize)..(new_offset as usize))
305        {
306            // We do have sufficient space to append this chunk to the slice:
307            dst.copy_from_slice(chunk);
308            header.offset = new_offset;
309            self.write_header(header)?;
310            Ok((previous_offset == 0 && chunk.len() != 0, new_offset))
311        } else {
312            // We don't have sufficient space to append this chunk to the slice.
313            // Do not update header.offset, but set header.exceeded:
314            header.exceeded = true;
315            self.write_header(header)?;
316            Err(ErrorCode::SIZE)
317        }
318    }
319
320    /// Append a chunk of data from an iterator.
321    ///
322    /// If the underlying slice has a correct `flags` and `offset` value, is not
323    /// halted, and has sufficient space for this `data` chunk, this function
324    /// returns the updated buffer offset (set to one past the last written
325    /// byte).
326    ///
327    /// This function returns whether this chunk was the first non-zero-length
328    /// `chunk` appended to the slice, and the offset after the append operation
329    /// (where the next chunk would be written in the data section).
330    ///
331    /// If the buffer does not have enough space, this function will still
332    /// partially copy this chunk and modify the slice payload data after
333    /// `offset`. It will not update the `offset` header field though, and
334    /// instead set the `exceeded` flag.
335    ///
336    /// This function fails with:
337    /// - `INVAL`: if the version is not `0`, or the reserved flags are not
338    ///   cleared.
339    /// - `BUSY`: if both the `halt` and `exceeded` flags are set. In this case,
340    ///   the slice will not be modified.
341    /// - `SIZE`: if the underlying slice is not large enough to fit the
342    ///   flags field and the offset field. In this case, the
343    ///   `exceeded` flag will be set and the slice will not be modified.
344    /// - `FAIL`: would need to increment offset beyond 2**32 - 1. Neither the
345    ///   payload slice nor any header fields will be modified.
346    ///
347    /// Appending a zero-length `chunk` will be treated as every other chunk,
348    /// but appending it will not set the exceeded flag, even if `offset` is at
349    /// the maximum position for this buffer. A zero-length append operation can
350    /// still fail due to the buffer being halted, having an improper header,
351    /// etc. A zero-length `chunk` will never be treated as the first chunk
352    /// appended to a buffer.
353    pub fn append_chunk_from_iter<I: IntoIterator<Item = u8>>(
354        &self,
355        src: I,
356    ) -> Result<(bool, u32), ErrorCode> {
357        // This includes general sanity checks:
358        let mut header = self.get_header()?;
359
360        // Check whether we are instructed to halt:
361        if header.exceeded && header.halt {
362            return Err(ErrorCode::BUSY);
363        }
364
365        // Create a subslice over the remaining payload space:
366        let remaining_payload_slice = self
367            .payload_slice()
368            .get((header.offset as usize)..)
369            // If the iterator yields 0 elements, even if the offset
370            // lies at the end or outside of the payload slice, we
371            // still don't want to return an error.
372            .unwrap_or((&mut [][..]).into());
373
374        // Create a mutable iterator over the remaining payload space:
375        let mut remaining_payload_iter = remaining_payload_slice.iter();
376
377        // We don't know how many bytes the `src` iterator will yield. Try to
378        // copy from it and abort if we run out of space on the payload iter.
379        //
380        // We don't use `zip` here, as that would silently truncate the `src`
381        // iter if the `payload` iter runs out of elements.
382        let bytes_written_or_out_of_space = src
383            .into_iter()
384            // Combine this byte with one of the payload slice. This is
385            // different from `zip` in that we keep iterating even if we hit
386            // `None` on the payload iter:
387            .map(|src_byte| {
388                remaining_payload_iter
389                    .next()
390                    .map(|payload_byte| (payload_byte, src_byte))
391            })
392            // If we managed to get a `Some(Cell<u8>)`, write a byte from the
393            // `src` to the payload slice and return `true`, else `false`.
394            .map(|opt| opt.map(|(dst, src)| dst.set(src)).is_some())
395            // Finally, count how many `true`s the iterator yields. Upon hitting
396            // the first `false`, we instead return `None`.
397            .try_fold(0, |acc, val| if val { Some(acc + 1) } else { None });
398
399        if let Some(bytes_written) = bytes_written_or_out_of_space {
400            // We did have sufficient space to append this chunk to the
401            // slice. Update the offset contained in the header.
402            let previous_offset = header.offset;
403
404            header.offset += bytes_written;
405            self.write_header(header)?;
406
407            Ok((previous_offset == 0 && bytes_written != 0, header.offset))
408        } else {
409            // We don't have sufficient space to append this chunk to the slice.
410            // Do not update header.offset, but set header.exceeded:
411            header.exceeded = true;
412            self.write_header(header)?;
413            Err(ErrorCode::SIZE)
414        }
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::StreamingProcessSlice;
421    use crate::processbuffer::WriteableProcessSlice;
422    use crate::ErrorCode;
423
424    #[test]
425    fn test_empty_process_slice() {
426        let process_slice: &WriteableProcessSlice = (&mut [][..]).into();
427        let s = StreamingProcessSlice::new(process_slice);
428
429        assert_eq!(s.append_chunk(b"The cake is a lie."), Err(ErrorCode::SIZE));
430        assert_eq!(
431            s.append_chunk_from_iter(b"The cake is a lie.".iter().copied()),
432            Err(ErrorCode::SIZE)
433        );
434    }
435
436    #[test]
437    fn test_header_only_process_slice() {
438        let mut buffer = [0_u8; 8];
439        let process_slice: &WriteableProcessSlice = (&mut buffer[..]).into();
440
441        let s = StreamingProcessSlice::new(process_slice);
442        let hdr = s.get_header().unwrap();
443        assert_eq!(hdr.version, 0);
444        assert_eq!(hdr.offset, 0);
445        assert_eq!(hdr.halt, false);
446        assert_eq!(hdr.exceeded, false);
447
448        assert_eq!(s.append_chunk(b""), Ok((false, 0)));
449        let hdr = s.get_header().unwrap();
450        assert_eq!(hdr.version, 0);
451        assert_eq!(hdr.offset, 0);
452        assert_eq!(hdr.halt, false);
453        assert_eq!(hdr.exceeded, false);
454
455        assert_eq!(
456            s.append_chunk_from_iter(b"".iter().copied()),
457            Ok((false, 0))
458        );
459        let hdr = s.get_header().unwrap();
460        assert_eq!(hdr.version, 0);
461        assert_eq!(hdr.offset, 0);
462        assert_eq!(hdr.halt, false);
463        assert_eq!(hdr.exceeded, false);
464
465        let prev_hdr = s.get_header().unwrap();
466        assert_eq!(s.append_chunk(b"The cake is a lie."), Err(ErrorCode::SIZE));
467        let hdr = s.get_header().unwrap();
468        assert_eq!(hdr.version, 0);
469        assert_eq!(hdr.offset, 0);
470        assert_eq!(hdr.halt, false);
471        assert_eq!(hdr.exceeded, true);
472
473        // Reset the header:
474        s.write_header(prev_hdr).unwrap();
475        let hdr = s.get_header().unwrap();
476        assert_eq!(prev_hdr, hdr);
477
478        assert_eq!(
479            s.append_chunk_from_iter(b"The cake is a lie.".iter().copied()),
480            Err(ErrorCode::SIZE)
481        );
482        let hdr = s.get_header().unwrap();
483        assert_eq!(hdr.version, 0);
484        assert_eq!(hdr.offset, 0);
485        assert_eq!(hdr.halt, false);
486        assert_eq!(hdr.exceeded, true);
487    }
488}