kernel/utilities/streaming_process_slice.rs
1// Licensed under the Apache License, Version 2.0 or the MIT License.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3// Copyright Tock Contributors 2024.
4
5//! Module containing the [`StreamingProcessSlice`] abstraction and
6//! related infrastructure. See the documentation on
7//! [`StreamingProcessSlice`] for more information.
8
9use core::ops::{Range, RangeFrom};
10
11use crate::processbuffer::WriteableProcessSlice;
12use crate::utilities::registers::{register_bitfields, LocalRegisterCopy};
13use crate::ErrorCode;
14
15/// A wrapper around a [`WriteableProcessSlice`] for streaming data from the
16/// kernel to a userspace process.
17///
18/// Applications like ADC sampling or network stacks require the kernel to
19/// provide a process with a continuous, lossless stream of data from a source
20/// that is not rate-controlled by the process. This wrapper implements the
21/// kernel-side of a simple protocol to achieve this goal, without requiring
22/// kernel-side buffering and by utilizing the atomic swap semantics of Tock's
23/// `allow` system call. The protocol is versioned; the semantics for version 0
24/// are as follows:
25///
26/// 1. To receive a data stream from the kernel, a userspace process allocates
27/// two buffers.
28///
29/// 2. The first buffer is prepared according to the format below. The `flags`
30/// field's version bits are set to `0`. The process clears the `exceeded`
31/// flag. It may set or clear the `halt` flag. All reserved flags must be set
32/// to `0`. Finally, the `offset` bytes (interpreted as a u32 value in native
33/// endianness) are set to `0`.
34///
35/// 3. The process `allow`s this buffer to a kernel driver.
36///
37/// 4. The kernel driver writes incoming data starting at the `data` field +
38/// `offset` bytes. After each write, the kernel increments `offset` by the
39/// number of bytes written.
40///
41/// For each *chunk* written to the buffer (where a *chunk* is an
42/// application-defined construct, such as a network packet), the kernel only
43/// increments `offset` if the full chunk was successfully written into the
44/// buffer. The kernel may or may not modify any data after the current
45/// `offset` value, regardless of whether any header fields were updated. The
46/// kernel never modifies any data in the region of
47/// `[data.start; data.start + offset)`.
48///
49/// Should the write of a chunk fail because the buffer has insufficient
50/// space left, the kernel will set the `exceeded` flag bit (index 0).
51///
52/// The `halt` flag bit as set by the process governs the kernel's behavior
53/// once the `exceeded` flag is set: if `halt` is cleared, the kernel will
54/// attempt to write future, smaller chunks to the buffer (and thus implicitly
55/// discarding some packets). If `halt` and `exceeded` are both set, the
56/// kernel will stop writing any data into the buffer.
57///
58/// 5. The kernel will schedule an upcall to the process, indicating that a
59/// write to the buffer (or setting the `exceeded`) flag occurred. The kernel
60/// may schedule only one upcall for the first chunk written to the buffer,
61/// or multiple upcalls (e.g., one upcall per chunk written). A process must
62/// not rely on the number of upcalls received and instead rely on the buffer
63/// header (`offset` and the `flags` bits) to determine the amount of data
64/// written to the buffer.
65///
66/// 6. The process prepares its second buffer, following step 2. The process
67/// then issues an `allow` operation that atomically swaps the current
68/// allowed buffer by its second buffer.
69///
70/// 7. The process can now process the received chunks contained in the initial
71/// buffer, while the kernel receives new chunks in the other, newly allowed
72/// buffer.
73///
74/// As the kernel cannot track if an `allow`ed buffer for a particular
75/// [`SyscallDriver`](crate::syscall_driver::SyscallDriver) implementation is intended to be a
76/// [`StreamingProcessSlice`], the kernel must use the header in the buffer as
77/// provided by the process. The implementation of [`StreamingProcessSlice`]
78/// ensures that an incorrect header will not cause a panic, but incoming
79/// packets could be dropped. A process using a syscall API that uses a
80/// [`StreamingProcessSlice`] must ensure it has properly initialized the header
81/// before `allow`ing the buffer.
82///
83/// The version 0 buffer format is specified as follows:
84/// ```text,ignore
85/// 0 2 4 6 8
86/// +-----------+-----------+-----------------------+----------...
87/// | version | flags | write offset (32 bit) | data
88/// +-----------+-----------+-----------------------+----------...
89/// | 000...000 | x{14},H,E | <native endian u32> |
90/// +-----------+-----------+-----------------------+----------...
91/// ```
92///
93/// The `version` field is a u16 integer stored in the target's native
94/// endianness. The `flags` field is a bitfield laid out as shown in the
95/// diagram above (laid out in big endian, with `E` being the least significant
96/// bit at byte 3). The `offset` field is a u32 integer stored in the target's
97/// native endianness.
98///
99/// The kernel does not impose any alignment restrictions on
100/// `StreamingProcessSlice`s of version 0.
101///
102/// The flags field is structured as follows:
103/// - `V`: version bits. This kernel only supports version `0`.
104/// - `H`: `halt` flag. If this flag is set and the `exceeded` flag is set, the
105/// kernel will not write any further data to this buffer.
106/// - `E`: `exceeded` flag. The kernel sets this flag when the remaining buffer
107/// capacity is insufficient to append the current chunk.
108/// - `x{14}`: reserved flag bits. Unless specified otherwise, processes must clear
109/// these flags prior to `allow`ing a buffer to the kernel. A kernel that does
110/// not know of a reserved flag must refuse to operate on a buffer that has
111/// such a flag set.
112#[repr(transparent)]
113pub struct StreamingProcessSlice<'a> {
114 slice: &'a WriteableProcessSlice,
115}
116
117register_bitfields![
118 u16,
119 pub StreamingProcessSliceFlags [
120 RESERVED OFFSET(2) NUMBITS(14) [
121 RESERVED0 = 0x00,
122 ],
123 HALT OFFSET(1) NUMBITS(1) [],
124 EXCEEDED OFFSET(0) NUMBITS(1) [],
125 ]
126];
127
128/// Fields in the `StreamingProcessSlice` buffer header.
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub struct StreamingProcessSliceHeader {
131 pub version: u16,
132 pub halt: bool,
133 pub exceeded: bool,
134 pub offset: u32,
135}
136
137impl<'a> StreamingProcessSlice<'a> {
138 const RANGE_VERSION: Range<usize> = 0..2;
139 const RANGE_FLAGS: Range<usize> = (Self::RANGE_VERSION.end)..(Self::RANGE_VERSION.end + 2);
140 const RANGE_OFFSET: Range<usize> = (Self::RANGE_FLAGS.end)..(Self::RANGE_FLAGS.end + 4);
141 const RANGE_DATA: RangeFrom<usize> = (Self::RANGE_OFFSET.end)..;
142
143 pub fn new(slice: &'a WriteableProcessSlice) -> StreamingProcessSlice<'a> {
144 StreamingProcessSlice { slice }
145 }
146
147 /// Checks whether the buffer is valid (of sufficient size to contain at
148 /// least the `flags` and `offset` fields), and extract the `flags` and
149 /// `offset` field values.
150 ///
151 /// This function fails with
152 /// - `INVAL`: if the version is not `0`, or the reserved flags are not
153 /// cleared.
154 /// - `SIZE`: if the underlying slice is not large enough to fit the
155 /// flags field and the offset field.
156 fn get_header(&self) -> Result<StreamingProcessSliceHeader, ErrorCode> {
157 let mut version_bytes = [0_u8; 2];
158 self.slice
159 .get(Self::RANGE_VERSION)
160 .ok_or(ErrorCode::SIZE)?
161 .copy_to_slice_or_err(&mut version_bytes)
162 .map_err(|_| ErrorCode::SIZE)?;
163
164 let version = u16::from_be_bytes(version_bytes);
165 if version != 0 {
166 return Err(ErrorCode::INVAL);
167 }
168
169 let mut flags_bytes = [0_u8; 2];
170 self.slice
171 .get(Self::RANGE_FLAGS)
172 .ok_or(ErrorCode::SIZE)?
173 .copy_to_slice_or_err(&mut flags_bytes)
174 .map_err(|_| ErrorCode::SIZE)?;
175
176 let flags: LocalRegisterCopy<u16, StreamingProcessSliceFlags::Register> =
177 LocalRegisterCopy::new(u16::from_be_bytes(flags_bytes));
178
179 if flags.read_as_enum(StreamingProcessSliceFlags::RESERVED)
180 != Some(StreamingProcessSliceFlags::RESERVED::Value::RESERVED0)
181 {
182 return Err(ErrorCode::INVAL);
183 }
184
185 let mut offset_bytes = [0_u8; Self::RANGE_OFFSET.end - Self::RANGE_OFFSET.start];
186 self.slice
187 .get(Self::RANGE_OFFSET)
188 .ok_or(ErrorCode::SIZE)?
189 .copy_to_slice_or_err(&mut offset_bytes)
190 .map_err(|_| ErrorCode::SIZE)?;
191
192 Ok(StreamingProcessSliceHeader {
193 version,
194 halt: flags.read(StreamingProcessSliceFlags::HALT) != 0,
195 exceeded: flags.read(StreamingProcessSliceFlags::EXCEEDED) != 0,
196 offset: u32::from_ne_bytes(offset_bytes),
197 })
198 }
199
200 /// Write updated header (`version`, `flags` and `offset`) back to the
201 /// underlying slice.
202 ///
203 /// This function does not perform any sanity checks on the `header`
204 /// argument. In particular, users of this function must ensure that they
205 /// previously extracted the written-back [`StreamingProcessSliceHeader`]
206 /// argument from the buffer, do not modify the version, do not change any
207 /// flags that are controlled by the process or otherwise violate the
208 /// protocol, and correctly increment the `offset` value.
209 ///
210 /// - `SIZE`: if the underlying slice is not large enough to fit the
211 /// flags field and the offset field.
212 fn write_header(&self, header: StreamingProcessSliceHeader) -> Result<(), ErrorCode> {
213 // Write the offset first, to avoid modifying the buffer if it's too
214 // small to fit the offset, but large enough to hold the flags byte:
215 let offset_bytes = u32::to_ne_bytes(header.offset);
216 self.slice
217 .get(Self::RANGE_OFFSET)
218 .ok_or(ErrorCode::SIZE)?
219 .copy_from_slice_or_err(&offset_bytes)
220 .map_err(|_| ErrorCode::SIZE)?;
221
222 let version_bytes: [u8; 2] = u16::to_ne_bytes(header.version);
223 self.slice
224 .get(Self::RANGE_VERSION)
225 .ok_or(ErrorCode::SIZE)?
226 .copy_from_slice_or_err(&version_bytes)
227 .map_err(|_| ErrorCode::SIZE)?;
228
229 let flags_bytes: [u8; 2] = u16::to_be_bytes(
230 (StreamingProcessSliceFlags::RESERVED::RESERVED0
231 + StreamingProcessSliceFlags::HALT.val(header.halt as u16)
232 + StreamingProcessSliceFlags::EXCEEDED.val(header.exceeded as u16))
233 .value,
234 );
235 self.slice
236 .get(Self::RANGE_FLAGS)
237 .ok_or(ErrorCode::SIZE)?
238 .copy_from_slice_or_err(&flags_bytes)
239 .map_err(|_| ErrorCode::SIZE)?;
240
241 Ok(())
242 }
243
244 /// Access the payload data portion of the underlying slice.
245 ///
246 /// This method does not perform any validation of the buffer version or
247 /// data. It must only be used on slices of version 0. If the underlying
248 /// slice is too small to hold the header fields, this will return an empty
249 /// slice.
250 fn payload_slice(&self) -> &WriteableProcessSlice {
251 self.slice
252 .get(Self::RANGE_DATA)
253 .unwrap_or((&mut [][..]).into())
254 }
255
256 /// Append a chunk of data to the slice.
257 ///
258 /// If the underlying slice has a correct `flags` and `offset` value, is not
259 /// halted, and has sufficient space for this `data` chunk, this function
260 /// returns the updated buffer offset (set to one past the last written
261 /// byte).
262 ///
263 /// This function returns whether this chunk was the first non-zero-length
264 /// `chunk` appended to the slice, and the offset after the append operation
265 /// (where the next chunk would be written in the data section).
266 ///
267 /// This function fails with:
268 /// - `INVAL`: if the version is not `0`, or the reserved flags are not
269 /// cleared.
270 /// - `BUSY`: if both the `halt` and `exceeded` flags are set. In this case,
271 /// the slice will not be modified.
272 /// - `SIZE`: if the underlying slice is not large enough to fit the
273 /// flags field and the offset field. In this case, the
274 /// `exceeded` flag will be set and the slice will not be modified.
275 /// - `FAIL`: would need to increment offset beyond 2**32 - 1. Neither the
276 /// payload slice nor any header fields will be modified.
277 ///
278 /// Appending a zero-length `chunk` will be treated as every other chunk,
279 /// but appending it will not set the exceeded flag, even if `offset` is at
280 /// the maximum position for this buffer. A zero-length append operation can
281 /// still fail due to the buffer being halted, having an improper header,
282 /// etc. A zero-length `chunk` will never be treated as the first chunk
283 /// appended to a buffer.
284 pub fn append_chunk(&self, chunk: &[u8]) -> Result<(bool, u32), ErrorCode> {
285 // This includes general sanity checks:
286 let mut header = self.get_header()?;
287
288 // Check whether we are instructed to halt:
289 if header.exceeded && header.halt {
290 return Err(ErrorCode::BUSY);
291 }
292
293 let previous_offset = header.offset;
294
295 let new_offset: u32 = (previous_offset as usize)
296 .checked_add(chunk.len())
297 .ok_or(ErrorCode::FAIL)?
298 .try_into()
299 .map_err(|_| ErrorCode::FAIL)?;
300
301 // Attempt to append the chunk to the slice, otherwise fail with SIZE:
302 if let Some(dst) = self
303 .payload_slice()
304 .get((previous_offset as usize)..(new_offset as usize))
305 {
306 // We do have sufficient space to append this chunk to the slice:
307 dst.copy_from_slice(chunk);
308 header.offset = new_offset;
309 self.write_header(header)?;
310 Ok((previous_offset == 0 && chunk.len() != 0, new_offset))
311 } else {
312 // We don't have sufficient space to append this chunk to the slice.
313 // Do not update header.offset, but set header.exceeded:
314 header.exceeded = true;
315 self.write_header(header)?;
316 Err(ErrorCode::SIZE)
317 }
318 }
319
320 /// Append a chunk of data from an iterator.
321 ///
322 /// If the underlying slice has a correct `flags` and `offset` value, is not
323 /// halted, and has sufficient space for this `data` chunk, this function
324 /// returns the updated buffer offset (set to one past the last written
325 /// byte).
326 ///
327 /// This function returns whether this chunk was the first non-zero-length
328 /// `chunk` appended to the slice, and the offset after the append operation
329 /// (where the next chunk would be written in the data section).
330 ///
331 /// If the buffer does not have enough space, this function will still
332 /// partially copy this chunk and modify the slice payload data after
333 /// `offset`. It will not update the `offset` header field though, and
334 /// instead set the `exceeded` flag.
335 ///
336 /// This function fails with:
337 /// - `INVAL`: if the version is not `0`, or the reserved flags are not
338 /// cleared.
339 /// - `BUSY`: if both the `halt` and `exceeded` flags are set. In this case,
340 /// the slice will not be modified.
341 /// - `SIZE`: if the underlying slice is not large enough to fit the
342 /// flags field and the offset field. In this case, the
343 /// `exceeded` flag will be set and the slice will not be modified.
344 /// - `FAIL`: would need to increment offset beyond 2**32 - 1. Neither the
345 /// payload slice nor any header fields will be modified.
346 ///
347 /// Appending a zero-length `chunk` will be treated as every other chunk,
348 /// but appending it will not set the exceeded flag, even if `offset` is at
349 /// the maximum position for this buffer. A zero-length append operation can
350 /// still fail due to the buffer being halted, having an improper header,
351 /// etc. A zero-length `chunk` will never be treated as the first chunk
352 /// appended to a buffer.
353 pub fn append_chunk_from_iter<I: IntoIterator<Item = u8>>(
354 &self,
355 src: I,
356 ) -> Result<(bool, u32), ErrorCode> {
357 // This includes general sanity checks:
358 let mut header = self.get_header()?;
359
360 // Check whether we are instructed to halt:
361 if header.exceeded && header.halt {
362 return Err(ErrorCode::BUSY);
363 }
364
365 // Create a subslice over the remaining payload space:
366 let remaining_payload_slice = self
367 .payload_slice()
368 .get((header.offset as usize)..)
369 // If the iterator yields 0 elements, even if the offset
370 // lies at the end or outside of the payload slice, we
371 // still don't want to return an error.
372 .unwrap_or((&mut [][..]).into());
373
374 // Create a mutable iterator over the remaining payload space:
375 let mut remaining_payload_iter = remaining_payload_slice.iter();
376
377 // We don't know how many bytes the `src` iterator will yield. Try to
378 // copy from it and abort if we run out of space on the payload iter.
379 //
380 // We don't use `zip` here, as that would silently truncate the `src`
381 // iter if the `payload` iter runs out of elements.
382 let bytes_written_or_out_of_space = src
383 .into_iter()
384 // Combine this byte with one of the payload slice. This is
385 // different from `zip` in that we keep iterating even if we hit
386 // `None` on the payload iter:
387 .map(|src_byte| {
388 remaining_payload_iter
389 .next()
390 .map(|payload_byte| (payload_byte, src_byte))
391 })
392 // If we managed to get a `Some(Cell<u8>)`, write a byte from the
393 // `src` to the payload slice and return `true`, else `false`.
394 .map(|opt| opt.map(|(dst, src)| dst.set(src)).is_some())
395 // Finally, count how many `true`s the iterator yields. Upon hitting
396 // the first `false`, we instead return `None`.
397 .try_fold(0, |acc, val| if val { Some(acc + 1) } else { None });
398
399 if let Some(bytes_written) = bytes_written_or_out_of_space {
400 // We did have sufficient space to append this chunk to the
401 // slice. Update the offset contained in the header.
402 let previous_offset = header.offset;
403
404 header.offset += bytes_written;
405 self.write_header(header)?;
406
407 Ok((previous_offset == 0 && bytes_written != 0, header.offset))
408 } else {
409 // We don't have sufficient space to append this chunk to the slice.
410 // Do not update header.offset, but set header.exceeded:
411 header.exceeded = true;
412 self.write_header(header)?;
413 Err(ErrorCode::SIZE)
414 }
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::StreamingProcessSlice;
421 use crate::processbuffer::WriteableProcessSlice;
422 use crate::ErrorCode;
423
424 #[test]
425 fn test_empty_process_slice() {
426 let process_slice: &WriteableProcessSlice = (&mut [][..]).into();
427 let s = StreamingProcessSlice::new(process_slice);
428
429 assert_eq!(s.append_chunk(b"The cake is a lie."), Err(ErrorCode::SIZE));
430 assert_eq!(
431 s.append_chunk_from_iter(b"The cake is a lie.".iter().copied()),
432 Err(ErrorCode::SIZE)
433 );
434 }
435
436 #[test]
437 fn test_header_only_process_slice() {
438 let mut buffer = [0_u8; 8];
439 let process_slice: &WriteableProcessSlice = (&mut buffer[..]).into();
440
441 let s = StreamingProcessSlice::new(process_slice);
442 let hdr = s.get_header().unwrap();
443 assert_eq!(hdr.version, 0);
444 assert_eq!(hdr.offset, 0);
445 assert_eq!(hdr.halt, false);
446 assert_eq!(hdr.exceeded, false);
447
448 assert_eq!(s.append_chunk(b""), Ok((false, 0)));
449 let hdr = s.get_header().unwrap();
450 assert_eq!(hdr.version, 0);
451 assert_eq!(hdr.offset, 0);
452 assert_eq!(hdr.halt, false);
453 assert_eq!(hdr.exceeded, false);
454
455 assert_eq!(
456 s.append_chunk_from_iter(b"".iter().copied()),
457 Ok((false, 0))
458 );
459 let hdr = s.get_header().unwrap();
460 assert_eq!(hdr.version, 0);
461 assert_eq!(hdr.offset, 0);
462 assert_eq!(hdr.halt, false);
463 assert_eq!(hdr.exceeded, false);
464
465 let prev_hdr = s.get_header().unwrap();
466 assert_eq!(s.append_chunk(b"The cake is a lie."), Err(ErrorCode::SIZE));
467 let hdr = s.get_header().unwrap();
468 assert_eq!(hdr.version, 0);
469 assert_eq!(hdr.offset, 0);
470 assert_eq!(hdr.halt, false);
471 assert_eq!(hdr.exceeded, true);
472
473 // Reset the header:
474 s.write_header(prev_hdr).unwrap();
475 let hdr = s.get_header().unwrap();
476 assert_eq!(prev_hdr, hdr);
477
478 assert_eq!(
479 s.append_chunk_from_iter(b"The cake is a lie.".iter().copied()),
480 Err(ErrorCode::SIZE)
481 );
482 let hdr = s.get_header().unwrap();
483 assert_eq!(hdr.version, 0);
484 assert_eq!(hdr.offset, 0);
485 assert_eq!(hdr.halt, false);
486 assert_eq!(hdr.exceeded, true);
487 }
488}