kernel/utilities/
streaming_process_slice.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
// Licensed under the Apache License, Version 2.0 or the MIT License.
// SPDX-License-Identifier: Apache-2.0 OR MIT
// Copyright Tock Contributors 2024.

//! Module containing the [`StreamingProcessSlice`] abstraction and
//! related infrastructure. See the documentation on
//! [`StreamingProcessSlice`] for more information.

use core::ops::{Range, RangeFrom};

use crate::processbuffer::WriteableProcessSlice;
use crate::utilities::registers::{register_bitfields, LocalRegisterCopy};
use crate::ErrorCode;

/// A wrapper around a [`WriteableProcessSlice`] for streaming data from the
/// kernel to a userspace process.
///
/// Applications like ADC sampling or network stacks require the kernel to
/// provide a process with a continuous, lossless stream of data from a source
/// that is not rate-controlled by the process. This wrapper implements the
/// kernel-side of a simple protocol to achieve this goal, without requiring
/// kernel-side buffering and by utilizing the atomic swap semantics of Tock's
/// `allow` system call. The protocol is versioned; the semantics for version 0
/// are as follows:
///
/// 1. To receive a data stream from the kernel, a userspace process allocates
///    two buffers.
///
/// 2. The first buffer is prepared according to the format below. The `flags`
///    field's version bits are set to `0`. The process clears the `exceeded`
///    flag. It may set or clear the `halt` flag. All reserved flags must be set
///    to `0`. Finally, the `offset` bytes (interpreted as a u32 value in native
///    endianness) are set to `0`.
///
/// 3. The process `allow`s this buffer to a kernel driver.
///
/// 4. The kernel driver writes incoming data starting at the `data` field +
///    `offset` bytes. After each write, the kernel increments `offset` by the
///    number of bytes written.
///
///    For each *chunk* written to the buffer (where a *chunk* is an
///    application-defined construct, such as a network packet), the kernel only
///    increments `offset` if the full chunk was successfully written into the
///    buffer. The kernel may or may not modify any data after the current
///    `offset` value, regardless of whether any header fields were updated. The
///    kernel never modifies any data in the region of
///    `[data.start; data.start + offset)`.
///
///    Should the write of a chunk fail because the buffer has insufficient
///    space left, the kernel will set the `exceeded` flag bit (index 0).
///
///    The `halt` flag bit as set by the process governs the kernel's behavior
///    once the `exceeded` flag is set: if `halt` is cleared, the kernel will
///    attempt to write future, smaller chunks to the buffer (and thus implicitly
///    discarding some packets). If `halt` and `exceeded` are both set, the
///    kernel will stop writing any data into the buffer.
///
/// 5. The kernel will schedule an upcall to the process, indicating that a
///    write to the buffer (or setting the `exceeded`) flag occurred. The kernel
///    may schedule only one upcall for the first chunk written to the buffer,
///    or multiple upcalls (e.g., one upcall per chunk written). A process must
///    not rely on the number of upcalls received and instead rely on the buffer
///    header (`offset` and the `flags` bits) to determine the amount of data
///    written to the buffer.
///
/// 6. The process prepares its second buffer, following step 2. The process
///    then issues an `allow` operation that atomically swaps the current
///    allowed buffer by its second buffer.
///
/// 7. The process can now process the received chunks contained in the initial
///    buffer, while the kernel receives new chunks in the other, newly allowed
///    buffer.
///
/// As the kernel cannot track if an `allow`ed buffer for a particular
/// [`SyscallDriver`](crate::syscall_driver::SyscallDriver) implementation is intended to be a
/// [`StreamingProcessSlice`], the kernel must use the header in the buffer as
/// provided by the process. The implementation of [`StreamingProcessSlice`]
/// ensures that an incorrect header will not cause a panic, but incoming
/// packets could be dropped. A process using a syscall API that uses a
/// [`StreamingProcessSlice`] must ensure it has properly initialized the header
/// before `allow`ing the buffer.
///
/// The version 0 buffer format is specified as follows:
/// ```text,ignore
/// 0           2           4           6           8
/// +-----------+-----------+-----------------------+----------...
/// | version   | flags     | write offset (32 bit) | data
/// +-----------+-----------+-----------------------+----------...
/// | 000...000 | x{14},H,E | <native endian u32>   |
/// +-----------+-----------+-----------------------+----------...
/// ```
///
/// The `version` field is a u16 integer stored in the target's native
/// endianness. The `flags` field is a bitfield laid out as shown in the
/// diagram above (laid out in big endian, with `E` being the least significant
/// bit at byte 3). The `offset` field is a u32 integer stored in the target's
/// native endianness.
///
/// The kernel does not impose any alignment restrictions on
/// `StreamingProcessSlice`s of version 0.
///
/// The flags field is structured as follows:
/// - `V`: version bits. This kernel only supports version `0`.
/// - `H`: `halt` flag. If this flag is set and the `exceeded` flag is set, the
///   kernel will not write any further data to this buffer.
/// - `E`: `exceeded` flag. The kernel sets this flag when the remaining buffer
///   capacity is insufficient to append the current chunk.
/// - `x{14}`: reserved flag bits. Unless specified otherwise, processes must clear
///   these flags prior to `allow`ing a buffer to the kernel. A kernel that does
///   not know of a reserved flag must refuse to operate on a buffer that has
///   such a flag set.
#[repr(transparent)]
pub struct StreamingProcessSlice<'a> {
    slice: &'a WriteableProcessSlice,
}

register_bitfields![
    u16,
    pub StreamingProcessSliceFlags [
        RESERVED OFFSET(2) NUMBITS(14) [
            RESERVED0 = 0x00,
        ],
        HALT OFFSET(1) NUMBITS(1) [],
        EXCEEDED OFFSET(0) NUMBITS(1) [],
    ]
];

/// Fields in the `StreamingProcessSlice` buffer header.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StreamingProcessSliceHeader {
    pub version: u16,
    pub halt: bool,
    pub exceeded: bool,
    pub offset: u32,
}

impl<'a> StreamingProcessSlice<'a> {
    const RANGE_VERSION: Range<usize> = 0..2;
    const RANGE_FLAGS: Range<usize> = (Self::RANGE_VERSION.end)..(Self::RANGE_VERSION.end + 2);
    const RANGE_OFFSET: Range<usize> = (Self::RANGE_FLAGS.end)..(Self::RANGE_FLAGS.end + 4);
    const RANGE_DATA: RangeFrom<usize> = (Self::RANGE_OFFSET.end + 1)..;

    pub fn new(slice: &'a WriteableProcessSlice) -> StreamingProcessSlice<'a> {
        StreamingProcessSlice { slice }
    }

    /// Checks whether the buffer is valid (of sufficient size to contain at
    /// least the `flags` and `offset` fields), and extract the `flags` and
    /// `offset` field values.
    ///
    /// This function fails with
    /// - `INVAL`: if the version is not `0`, or the reserved flags are not
    ///   cleared.
    /// - `SIZE`: if the underlying slice is not large enough to fit the
    ///   flags field and the offset field.
    fn get_header(&self) -> Result<StreamingProcessSliceHeader, ErrorCode> {
        let mut version_bytes = [0_u8; 2];
        self.slice
            .get(Self::RANGE_VERSION)
            .ok_or(ErrorCode::SIZE)?
            .copy_to_slice_or_err(&mut version_bytes)
            .map_err(|_| ErrorCode::SIZE)?;

        let version = u16::from_be_bytes(version_bytes);
        if version != 0 {
            return Err(ErrorCode::INVAL);
        }

        let mut flags_bytes = [0_u8; 2];
        self.slice
            .get(Self::RANGE_FLAGS)
            .ok_or(ErrorCode::SIZE)?
            .copy_to_slice_or_err(&mut flags_bytes)
            .map_err(|_| ErrorCode::SIZE)?;

        let flags: LocalRegisterCopy<u16, StreamingProcessSliceFlags::Register> =
            LocalRegisterCopy::new(u16::from_be_bytes(flags_bytes));

        if flags.read_as_enum(StreamingProcessSliceFlags::RESERVED)
            != Some(StreamingProcessSliceFlags::RESERVED::Value::RESERVED0)
        {
            return Err(ErrorCode::INVAL);
        }

        let mut offset_bytes = [0_u8; Self::RANGE_OFFSET.end - Self::RANGE_OFFSET.start];
        self.slice
            .get(Self::RANGE_OFFSET)
            .ok_or(ErrorCode::SIZE)?
            .copy_to_slice_or_err(&mut offset_bytes)
            .map_err(|_| ErrorCode::SIZE)?;

        Ok(StreamingProcessSliceHeader {
            version,
            halt: flags.read(StreamingProcessSliceFlags::HALT) != 0,
            exceeded: flags.read(StreamingProcessSliceFlags::EXCEEDED) != 0,
            offset: u32::from_ne_bytes(offset_bytes),
        })
    }

    /// Write updated header (`version`, `flags` and `offset`) back to the
    /// underlying slice.
    ///
    /// This function does not perform any sanity checks on the `header`
    /// argument.  In particular, users of this function must ensure that they
    /// previously extracted the written-back [`StreamingProcessSliceHeader`]
    /// argument from the buffer, do not modify the version, do not change any
    /// flags that are controlled by the process or otherwise violate the
    /// protocol, and correctly increment the `offset` value.
    ///
    /// - `SIZE`: if the underlying slice is not large enough to fit the
    ///   flags field and the offset field.
    fn write_header(&self, header: StreamingProcessSliceHeader) -> Result<(), ErrorCode> {
        // Write the offset first, to avoid modifying the buffer if it's too
        // small to fit the offset, but large enough to hold the flags byte:
        let offset_bytes = u32::to_ne_bytes(header.offset);
        self.slice
            .get(Self::RANGE_OFFSET)
            .ok_or(ErrorCode::SIZE)?
            .copy_from_slice_or_err(&offset_bytes)
            .map_err(|_| ErrorCode::SIZE)?;

        let version_bytes: [u8; 2] = u16::to_ne_bytes(header.version);
        self.slice
            .get(Self::RANGE_VERSION)
            .ok_or(ErrorCode::SIZE)?
            .copy_from_slice_or_err(&version_bytes)
            .map_err(|_| ErrorCode::SIZE)?;

        let flags_bytes: [u8; 2] = u16::to_be_bytes(
            (StreamingProcessSliceFlags::RESERVED::RESERVED0
                + StreamingProcessSliceFlags::HALT.val(header.halt as u16)
                + StreamingProcessSliceFlags::EXCEEDED.val(header.exceeded as u16))
            .value,
        );
        self.slice
            .get(Self::RANGE_FLAGS)
            .ok_or(ErrorCode::SIZE)?
            .copy_from_slice_or_err(&flags_bytes)
            .map_err(|_| ErrorCode::SIZE)?;

        Ok(())
    }

    /// Access the payload data portion of the underlying slice.
    ///
    /// This method does not perform any validation of the buffer version or
    /// data. It must only be used on slices of version 0. If the underlying
    /// slice is too small to hold the header fields, this will return an empty
    /// slice.
    fn payload_slice(&self) -> &WriteableProcessSlice {
        self.slice
            .get(Self::RANGE_DATA)
            .unwrap_or((&mut [][..]).into())
    }

    /// Append a chunk of data to the slice.
    ///
    /// If the underlying slice has a correct `flags` and `offset` value, is not
    /// halted, and has sufficient space for this `data` chunk, this function
    /// returns the updated buffer offset (set to one past the last written
    /// byte).
    ///
    /// This function returns whether this chunk was the first non-zero-length
    /// `chunk` appended to the slice, and the offset after the append operation
    /// (where the next chunk would be written in the data section).
    ///
    /// This function fails with:
    /// - `INVAL`: if the version is not `0`, or the reserved flags are not
    ///   cleared.
    /// - `BUSY`: if both the `halt` and `exceeded` flags are set. In this case,
    ///   the slice will not be modified.
    /// - `SIZE`: if the underlying slice is not large enough to fit the
    ///   flags field and the offset field. In this case, the
    ///   `exceeded` flag will be set and the slice will not be modified.
    /// - `FAIL`: would need to increment offset beyond 2**32 - 1. Neither the
    ///   payload slice nor any header fields will be modified.
    ///
    /// Appending a zero-length `chunk` will be treated as every other chunk,
    /// but appending it will not set the exceeded flag, even if `offset` is at
    /// the maximum position for this buffer. A zero-length append operation can
    /// still fail due to the buffer being halted, having an improper header,
    /// etc. A zero-length `chunk` will never be treated as the first chunk
    /// appended to a buffer.
    pub fn append_chunk(&self, chunk: &[u8]) -> Result<(bool, u32), ErrorCode> {
        // This includes general sanity checks:
        let mut header = self.get_header()?;

        // Check whether we are instructed to halt:
        if header.exceeded && header.halt {
            return Err(ErrorCode::BUSY);
        }

        let previous_offset = header.offset;

        let new_offset: u32 = (previous_offset as usize)
            .checked_add(chunk.len())
            .ok_or(ErrorCode::FAIL)?
            .try_into()
            .map_err(|_| ErrorCode::FAIL)?;

        // Attempt to append the chunk to the slice, otherwise fail with SIZE:
        if let Some(dst) = self
            .payload_slice()
            .get((previous_offset as usize)..(new_offset as usize))
        {
            // We do have sufficient space to append this chunk to the slice:
            dst.copy_from_slice(chunk);
            header.offset = new_offset;
            self.write_header(header)?;
            Ok((previous_offset == 0 && chunk.len() != 0, new_offset))
        } else {
            // We don't have sufficient space to append this chunk to the slice.
            // Do not update header.offset, but set header.exceeded:
            header.exceeded = true;
            self.write_header(header)?;
            Err(ErrorCode::SIZE)
        }
    }

    /// Append a chunk of data from an iterator.
    ///
    /// If the underlying slice has a correct `flags` and `offset` value, is not
    /// halted, and has sufficient space for this `data` chunk, this function
    /// returns the updated buffer offset (set to one past the last written
    /// byte).
    ///
    /// This function returns whether this chunk was the first non-zero-length
    /// `chunk` appended to the slice, and the offset after the append operation
    /// (where the next chunk would be written in the data section).
    ///
    /// If the buffer does not have enough space, this function will still
    /// partially copy this chunk and modify the slice payload data after
    /// `offset`. It will not update the `offset` header field though, and
    /// instead set the `exceeded` flag.
    ///
    /// This function fails with:
    /// - `INVAL`: if the version is not `0`, or the reserved flags are not
    ///   cleared.
    /// - `BUSY`: if both the `halt` and `exceeded` flags are set. In this case,
    ///   the slice will not be modified.
    /// - `SIZE`: if the underlying slice is not large enough to fit the
    ///   flags field and the offset field. In this case, the
    ///   `exceeded` flag will be set and the slice will not be modified.
    /// - `FAIL`: would need to increment offset beyond 2**32 - 1. Neither the
    ///   payload slice nor any header fields will be modified.
    ///
    /// Appending a zero-length `chunk` will be treated as every other chunk,
    /// but appending it will not set the exceeded flag, even if `offset` is at
    /// the maximum position for this buffer. A zero-length append operation can
    /// still fail due to the buffer being halted, having an improper header,
    /// etc. A zero-length `chunk` will never be treated as the first chunk
    /// appended to a buffer.
    pub fn append_chunk_from_iter<I: IntoIterator<Item = u8>>(
        &self,
        src: I,
    ) -> Result<(bool, u32), ErrorCode> {
        // This includes general sanity checks:
        let mut header = self.get_header()?;

        // Check whether we are instructed to halt:
        if header.exceeded && header.halt {
            return Err(ErrorCode::BUSY);
        }

        // Create a subslice over the remaining payload space:
        let remaining_payload_slice = self
            .payload_slice()
            .get((header.offset as usize)..)
            // If the iterator yields 0 elements, even if the offset
            // lies at the end or outside of the payload slice, we
            // still don't want to return an error.
            .unwrap_or((&mut [][..]).into());

        // Create a mutable iterator over the remaining payload space:
        let mut remaining_payload_iter = remaining_payload_slice.iter();

        // We don't know how many bytes the `src` iterator will yield. Try to
        // copy from it and abort if we run out of space on the payload iter.
        //
        // We don't use `zip` here, as that would silently truncate the `src`
        // iter if the `payload` iter runs out of elements.
        let bytes_written_or_out_of_space = src
            .into_iter()
            // Combine this byte with one of the payload slice. This is
            // different from `zip` in that we keep iterating even if we hit
            // `None` on the payload iter:
            .map(|src_byte| {
                remaining_payload_iter
                    .next()
                    .map(|payload_byte| (payload_byte, src_byte))
            })
            // If we managed to get a `Some(Cell<u8>)`, write a byte from the
            // `src` to the payload slice and return `true`, else `false`.
            .map(|opt| opt.map(|(dst, src)| dst.set(src)).is_some())
            // Finally, count how many `true`s the iterator yields. Upon hitting
            // the first `false`, we instead return `None`.
            .try_fold(0, |acc, val| if val { Some(acc + 1) } else { None });

        if let Some(bytes_written) = bytes_written_or_out_of_space {
            // We did have sufficient space to append this chunk to the
            // slice. Update the offset contained in the header.
            let previous_offset = header.offset;

            header.offset += bytes_written;
            self.write_header(header)?;

            Ok((previous_offset == 0 && bytes_written != 0, header.offset))
        } else {
            // We don't have sufficient space to append this chunk to the slice.
            // Do not update header.offset, but set header.exceeded:
            header.exceeded = true;
            self.write_header(header)?;
            Err(ErrorCode::SIZE)
        }
    }
}

#[cfg(test)]
mod tests {
    use super::StreamingProcessSlice;
    use crate::processbuffer::WriteableProcessSlice;
    use crate::ErrorCode;

    #[test]
    fn test_empty_process_slice() {
        let process_slice: &WriteableProcessSlice = (&mut [][..]).into();
        let s = StreamingProcessSlice::new(process_slice);

        assert_eq!(s.append_chunk(b"The cake is a lie."), Err(ErrorCode::SIZE));
        assert_eq!(
            s.append_chunk_from_iter(b"The cake is a lie.".iter().copied()),
            Err(ErrorCode::SIZE)
        );
    }

    #[test]
    fn test_header_only_process_slice() {
        let mut buffer = [0_u8; 8];
        let process_slice: &WriteableProcessSlice = (&mut buffer[..]).into();

        let s = StreamingProcessSlice::new(process_slice);
        let hdr = s.get_header().unwrap();
        assert_eq!(hdr.version, 0);
        assert_eq!(hdr.offset, 0);
        assert_eq!(hdr.halt, false);
        assert_eq!(hdr.exceeded, false);

        assert_eq!(s.append_chunk(b""), Ok((false, 0)));
        let hdr = s.get_header().unwrap();
        assert_eq!(hdr.version, 0);
        assert_eq!(hdr.offset, 0);
        assert_eq!(hdr.halt, false);
        assert_eq!(hdr.exceeded, false);

        assert_eq!(
            s.append_chunk_from_iter(b"".iter().copied()),
            Ok((false, 0))
        );
        let hdr = s.get_header().unwrap();
        assert_eq!(hdr.version, 0);
        assert_eq!(hdr.offset, 0);
        assert_eq!(hdr.halt, false);
        assert_eq!(hdr.exceeded, false);

        let prev_hdr = s.get_header().unwrap();
        assert_eq!(s.append_chunk(b"The cake is a lie."), Err(ErrorCode::SIZE));
        let hdr = s.get_header().unwrap();
        assert_eq!(hdr.version, 0);
        assert_eq!(hdr.offset, 0);
        assert_eq!(hdr.halt, false);
        assert_eq!(hdr.exceeded, true);

        // Reset the header:
        s.write_header(prev_hdr).unwrap();
        let hdr = s.get_header().unwrap();
        assert_eq!(prev_hdr, hdr);

        assert_eq!(
            s.append_chunk_from_iter(b"The cake is a lie.".iter().copied()),
            Err(ErrorCode::SIZE)
        );
        let hdr = s.get_header().unwrap();
        assert_eq!(hdr.version, 0);
        assert_eq!(hdr.offset, 0);
        assert_eq!(hdr.halt, false);
        assert_eq!(hdr.exceeded, true);
    }
}