Skip to content

roverd.channels

Channel types.

Info

The core API is implemented and documented by the base type Channel.

Available Types

The following channel types are provided:

Name Class Description
raw RawChannel Little-endian raw byte array
lzma LzmaChannel LZMA-compressed raw data
lzmaf LzmaFrameChannel LZMA, but each frame is compressed independently
mjpg VideoChannel MJPEG video
npz NPZBlobChannel Sequence of .npz files, one per blob
jpg JPEGBlobChannel Sequence of .jpg files, one per blob

Configuration

Channels are conventionally specified by a configuration dict with the following fields:

  • format: channel data format.
  • type: data type, using numpy size-in-bytes convention (e.g. u2 for 2-byte/16-bit unsigned integer)
  • shape: shape of the non-time dimensions.
  • desc: description of the channel; should be human-readable, and is not intended for use by scripts.

Use from_config to create channel from a provided configuration.

Sample Configuration
{
    "format": "lzmaf", "type": "u1", "shape": [64, 128],
    "desc": "range-azimuth BEV from 3D polar occupancy"
}

roverd.channels.JPEGBlobChannel

Bases: BlobChannel

Blob channel consisting of .jpg files.

Parameters:

Name Type Description Default
path str

file path.

required
dtype str | type | dtype

data type, or string name of dtype (e.g. u1, f4).

required
shape Sequence[int]

data shape.

required
workers int

maximum number of worker threads to use for I/O.

8
length int | None

number of blobs, potentially calculated some more efficient way. If None, will be calculated by counting files in the directory.

None

Attributes:

Name Type Description
path str

file path.

type dtype

numpy data type.

shape tuple[int, ...]

sample data shape.

size int

total file size, in bytes.

Source code in format/src/roverd/channels/blob.py
class JPEGBlobChannel(BlobChannel):
    """Blob channel consisting of `.jpg` files.

    Args:
        path: file path.
        dtype: data type, or string name of dtype (e.g. `u1`, `f4`).
        shape: data shape.
        workers: maximum number of worker threads to use for I/O.
        length: number of blobs, potentially calculated some more efficient
            way. If `None`, will be calculated by counting files in the
            directory.

    Attributes:
        path: file path.
        type: numpy data type.
        shape: sample data shape.
        size: total file size, in bytes.
    """

    @cached_property
    def _cv2_module(self):
        try:
            import cv2
            return cv2
        except ImportError:
            raise ImportError(
                "Could not import cv2. `opencv-python` or "
                "`opencv-python-headless` must be installed in order to use "
                "video encoding or decoding.")

    def _read_blob(self, index: int) -> np.ndarray:
        filename = self._filename(index) + ".jpg"
        if not os.path.exists(filename):
            raise IndexError(f"Blob index {index} does not exist.")

        img = self._cv2_module.imread(
            filename, self._cv2_module.IMREAD_UNCHANGED)
        if img is None:
            raise ValueError(f"Could not read image at index {index}.")
        return self._cv2_module.cvtColor(img, self._cv2_module.COLOR_BGR2RGB)

    def _write_blob(self, index: int, data: np.ndarray) -> None:
        filename = self._filename(index) + ".jpg"
        img = self._cv2_module.cvtColor(data, self._cv2_module.COLOR_RGB2BGR)
        if not self._cv2_module.imwrite(filename, img):
            raise ValueError(f"Could not write image at index {index}.")

consume

consume(
    stream: Streamable[Data | Sequence[Data]],
    thread: bool = False,
    append: bool = False,
) -> None

Consume iterable or queue and write to file.

  • If Iterable, fetches from the iterator until exhausted (i.e. until StopIteration), then returns.
  • If Queue, .get() from the Queue until None is received, then return.

Parameters:

Name Type Description Default
stream Streamable[Data | Sequence[Data]]

stream to consume.

required
thread bool

whether to return immediately, and run in a separate thread instead of returning immediately.

False
append bool

whether to append or overwrite existing blobs.

False

Raises:

Type Description
ValueError

data type/shape does not match channel specifications.

Source code in format/src/roverd/channels/abstract.py
def consume(
    self, stream: Streamable[Data | Sequence[Data]], thread: bool = False,
    append: bool = False
) -> None:
    """Consume iterable or queue and write to file.

    - If `Iterable`, fetches from the iterator until exhausted (i.e. until
      `StopIteration`), then returns.
    - If `Queue`, `.get()` from the `Queue` until `None` is received, then
      return.

    Args:
        stream: stream to consume.
        thread: whether to return immediately, and run in a separate thread
            instead of returning immediately.
        append: whether to append or overwrite existing blobs.

    Raises:
        ValueError: data type/shape does not match channel specifications.
    """
    if isinstance(stream, Queue):
        stream = Buffer(stream)
    if thread:
        Thread(target=self.consume, kwargs={"stream": stream}).start()
        return

    if not append:
        self._n_blobs = 0

    os.makedirs(self.path, exist_ok=True)
    for frame in stream:
        if not isinstance(frame, np.ndarray):
            raise ValueError("BlobChannels do not allow raw data.")
        if len(frame.shape) != len(self.shape):
            raise ValueError(
                "BlobChannels do not allow batched write data.")

        self._verify_type(frame)
        self._write_blob(self._n_blobs, frame)
        self._n_blobs += 1

read

read(
    start: int | integer = 0, samples: int | integer = -1
) -> Shaped[ndarray, ...]

Read data.

Parameters:

Name Type Description Default
start int | integer

start index to read.

0
samples int | integer

number of samples/frames to read. If -1, read all data.

-1

Returns:

Type Description
Shaped[ndarray, ...]

Read frames as an array, with a leading axis corresponding to the number of samples.

Source code in format/src/roverd/channels/abstract.py
def read(
    self, start: int | np.integer = 0, samples: int | np.integer = -1
) -> Shaped[np.ndarray, "..."]:
    """Read data.

    Args:
        start: start index to read.
        samples: number of samples/frames to read. If `-1`, read all data.

    Returns:
        Read frames as an array, with a leading axis corresponding to
            the number of `samples`.
    """
    if samples == -1:
        samples = self._n_blobs - start

    if start < 0 or start + samples > self._n_blobs:
        raise IndexError(
            f"Read indices ({start, start + samples}) out of range "
            f"(0, {self._n_blobs}).")

    with ThreadPool(int(min(self.workers, samples))) as pool:
        blobs = pool.map(self._read_blob, range(start, start + samples))

    return np.stack(blobs, axis=0)

stream

stream(
    transform: Callable[[Shaped[ndarray, ...]], Any] | None = None,
    batch: int = 0,
) -> Iterator[Shaped[ndarray, ...]]

Get iterable data stream.

Parameters:

Name Type Description Default
transform Callable[[Shaped[ndarray, ...]], Any] | None

callable to apply to the read data. Should take a single sample or batch of samples, and can return an arbitrary type.

None
batch int

batch size to read. If 0, load only a single sample and do not append an empty axis.

0

Returns:

Type Description
Iterator[Shaped[ndarray, ...]]

Iterator which yields successive frames.

Source code in format/src/roverd/channels/abstract.py
def stream(
    self, transform: Callable[
        [Shaped[np.ndarray, "..."]], Any] | None = None, batch: int = 0
) -> Iterator[Shaped[np.ndarray, "..."]]:
    """Get iterable data stream.

    Args:
        transform: callable to apply to the read data. Should take a single
            sample or batch of samples, and can return an arbitrary type.
        batch: batch size to read. If 0, load only a single sample and do
            not append an empty axis.

    Returns:
        Iterator which yields successive frames.
    """
    if batch <= 0:
        # Single sample mode
        for i in range(self._n_blobs):
            data = self.read(i, 1)[0]  # Remove batch dimension
            if transform is not None:
                data = transform(data)
            yield data
    else:
        # Batch mode
        for start in range(0, self._n_blobs, batch):
            samples_to_read = min(batch, self._n_blobs - start)
            data = self.read(start, samples_to_read)
            if transform is not None:
                data = transform(data)
            yield data

write

write(data: Data, append: bool = False) -> None

Write data.

Parameters:

Name Type Description Default
data Data

data to write, with leading axis corresponding to the number of samples/frames.

required
append bool

append is currently not implemented for blob channels.

False

Raises:

Type Description
ValueError

data type/shape does not match channel specifications.

Source code in format/src/roverd/channels/abstract.py
def write(self, data: Data, append: bool = False) -> None:
    """Write data.

    Args:
        data: data to write, with leading axis corresponding to the number
            of samples/frames.
        append: append is currently not implemented for blob channels.

    Raises:
        ValueError: data type/shape does not match channel specifications.
    """
    assert append is False, "Append is not implemented."
    if not isinstance(data, np.ndarray):
        raise ValueError("BlobChannels do not allow raw data.")
    if len(data.shape) != len(self.shape) + 1:
        raise ValueError(
            f"Data shape {data.shape} does not match channel shape "
            f"{self.shape}.")

    os.makedirs(self.path, exist_ok=True)
    with ThreadPool(self.workers) as pool:
        pool.map(
            lambda i: self._write_blob(self._n_blobs + i, data[i]),
            range(data.shape[0]))

    self._n_blobs = data.shape[0]

roverd.channels.LzmaChannel

Bases: RawChannel

LZMA-compressed binary data.

Parameters:

Name Type Description Default
path str

file path.

required
dtype str | type | dtype

data type, or string name of dtype (e.g. u1, f4).

required
shape Sequence[int]

data shape.

required

Attributes:

Name Type Description
path str

file path.

type dtype

numpy data type.

shape tuple[int, ...]

sample data shape.

size int

total file size, in bytes.

Source code in format/src/roverd/channels/lzma.py
class LzmaChannel(RawChannel):
    """LZMA-compressed binary data.

    Args:
        path: file path.
        dtype: data type, or string name of dtype (e.g. `u1`, `f4`).
        shape: data shape.

    Attributes:
        path: file path.
        type: numpy data type.
        shape: sample data shape.
        size: total file size, in bytes.
    """

    @staticmethod
    def _open_r(path: str) -> io.BufferedIOBase:
        return lzma.open(path, 'rb')

    @staticmethod
    def _open_w(path, append: bool = False) -> io.BufferedIOBase:
        return lzma.open(path, 'ab' if append else 'wb')

    def read(
        self, start: int | np.integer = 0, samples: int | np.integer = -1
    ) -> Shaped[np.ndarray, "..."]:
        """Read data.

        Args:
            start: start index to read.
            samples: number of samples/frames to read. If `-1`, read all data.

        Returns:
            Read frames as an array, with a leading axis corresponding to
                the number of `samples`.
        """
        with self._open_r(self.path) as f:
            if start > 0:
                f.seek(self.size * start, 0)
            buf = f.read(-1 if samples == -1 else self.size * samples)

        return self.buffer_to_array(buf, batch=True)

    def write(self, data: Data, append: bool = False) -> None:
        """Write data.

        Args:
            data: data to write.
            append: only `append=False` is allowed.

        Raises:
            ValueError: data type/shape does not match channel specifications.
        """
        if append:
            raise ValueError("Can only write/overwrite an lzma channel.")
        super().write(data, append=False)

consume

consume(
    stream: Streamable[Data | Sequence[Data]], thread: bool = False
) -> None

Consume iterable or queue and write to file.

  • If Iterable, fetches from the iterator until exhausted (i.e. until StopIteration), then returns.
  • If Queue, .get() from the Queue until None is received, then return.

Parameters:

Name Type Description Default
stream Streamable[Data | Sequence[Data]]

stream to consume.

required
thread bool

if True, return immediately, and run in a separate thread instead of returning immediately.

False

Raises:

Type Description
ValueError

data type/shape does not match channel specifications.

Source code in format/src/roverd/channels/raw.py
def consume(
    self, stream: Streamable[Data | Sequence[Data]], thread: bool = False
) -> None:
    """Consume iterable or queue and write to file.

    - If `Iterable`, fetches from the iterator until exhausted (i.e. until
      `StopIteration`), then returns.
    - If `Queue`, `.get()` from the `Queue` until `None` is received, then
      return.

    Args:
        stream: stream to consume.
        thread: if `True`, return immediately, and run in a separate thread
            instead of returning immediately.

    Raises:
        ValueError: data type/shape does not match channel specifications.
    """
    if isinstance(stream, Queue):
        stream = Buffer(stream)
    if thread:
        Thread(target=self.consume, kwargs={"stream": stream}).start()
        return

    with self._open_w(self.path, append=False) as f:
        for data in stream:
            self._verify_type(data)
            f.write(self._serialize(data))

read

read(
    start: int | integer = 0, samples: int | integer = -1
) -> Shaped[ndarray, ...]

Read data.

Parameters:

Name Type Description Default
start int | integer

start index to read.

0
samples int | integer

number of samples/frames to read. If -1, read all data.

-1

Returns:

Type Description
Shaped[ndarray, ...]

Read frames as an array, with a leading axis corresponding to the number of samples.

Source code in format/src/roverd/channels/lzma.py
def read(
    self, start: int | np.integer = 0, samples: int | np.integer = -1
) -> Shaped[np.ndarray, "..."]:
    """Read data.

    Args:
        start: start index to read.
        samples: number of samples/frames to read. If `-1`, read all data.

    Returns:
        Read frames as an array, with a leading axis corresponding to
            the number of `samples`.
    """
    with self._open_r(self.path) as f:
        if start > 0:
            f.seek(self.size * start, 0)
        buf = f.read(-1 if samples == -1 else self.size * samples)

    return self.buffer_to_array(buf, batch=True)

stream

stream(
    transform: Callable[[Shaped[ndarray, ...]], Any] | None = None,
    batch: int = 0,
) -> Iterator[Shaped[ndarray, ...]]

Get iterable data stream.

Parameters:

Name Type Description Default
transform Callable[[Shaped[ndarray, ...]], Any] | None

callable to apply to the read data. Should take a single sample or batch of samples, and can return an arbitrary type.

None
batch int

batch size to read. If 0, load only a single sample and do not append an empty axis.

0

Returns:

Type Description
Iterator[Shaped[ndarray, ...]]

Iterator which yields successive frames.

Source code in format/src/roverd/channels/raw.py
def stream(
    self, transform: Callable[
        [Shaped[np.ndarray, "..."]], Any] | None = None, batch: int = 0
) -> Iterator[Shaped[np.ndarray, "..."]]:
    """Get iterable data stream.

    Args:
        transform: callable to apply to the read data. Should take a single
            sample or batch of samples, and can return an arbitrary type.
        batch: batch size to read. If 0, load only a single sample and do
            not append an empty axis.

    Returns:
        Iterator which yields successive frames.
    """
    size = self.size if batch == 0 else batch * self.size

    if transform is None:
        transform = lambda x: x

    with self._open_r(self.path) as fp:
        while True:
            data = cast(bytes, fp.read(size))
            if len(data) < size:
                fp.close()
                partial_batch = len(data) // self.size
                if partial_batch > 0:
                    yield transform(self.buffer_to_array(
                        data[:partial_batch * size], batch=(batch != 0)))
                return
            yield transform(self.buffer_to_array(data, batch=(batch != 0)))

write

write(data: Data, append: bool = False) -> None

Write data.

Parameters:

Name Type Description Default
data Data

data to write.

required
append bool

only append=False is allowed.

False

Raises:

Type Description
ValueError

data type/shape does not match channel specifications.

Source code in format/src/roverd/channels/lzma.py
def write(self, data: Data, append: bool = False) -> None:
    """Write data.

    Args:
        data: data to write.
        append: only `append=False` is allowed.

    Raises:
        ValueError: data type/shape does not match channel specifications.
    """
    if append:
        raise ValueError("Can only write/overwrite an lzma channel.")
    super().write(data, append=False)

roverd.channels.LzmaFrameChannel

Bases: Channel

Frame-wise LZMA-compressed binary data.

Should have an additional file with the suffix _i, e.g. mask, mask_i which contains the starting offsets for each frame as a u8 (8-byte unsigned integer).

This file should have the offset for the next unwritten frame as well. As an example, for a channel example with compressed frame sizes [2, 5, 3], example_i should be:

[0, 2, 7, 10]

Parameters:

Name Type Description Default
path str

file path.

required
dtype str | type | dtype

data type, or string name of dtype (e.g. u1, f4).

required
shape Sequence[int]

data shape.

required

Attributes:

Name Type Description
path str

file path.

type dtype

numpy data type.

shape tuple[int, ...]

sample data shape.

size int

total file size, in bytes.

Source code in format/src/roverd/channels/lzma.py
class LzmaFrameChannel(Channel):
    """Frame-wise LZMA-compressed binary data.

    Should have an additional file with the suffix `_i`, e.g. `mask`, `mask_i`
    which contains the starting offsets for each frame as a u8 (8-byte
    unsigned integer).

    This file should have the offset for the next unwritten frame as well.
    As an example, for a channel `example` with compressed frame sizes
    `[2, 5, 3]`, `example_i` should be:

    ```python
    [0, 2, 7, 10]
    ```

    Args:
        path: file path.
        dtype: data type, or string name of dtype (e.g. `u1`, `f4`).
        shape: data shape.

    Attributes:
        path: file path.
        type: numpy data type.
        shape: sample data shape.
        size: total file size, in bytes.
    """

    def read(
        self, start: int | np.integer = 0, samples: int | np.integer = -1
    ) -> Shaped[np.ndarray, "samples ..."]:
        """Read data.

        Args:
            start: start index to read.
            samples: number of samples/frames to read. If `-1`, read all data.

        Returns:
            Read frames as an array, with a leading axis corresponding to
                the number of `samples`. If only a subset of frames are
                readable (e.g. due to reaching the end of the video), the
                result is truncated.

        Raises:
            ValueError: None of the frames could be read, possibly due to
                an invalid video, or invalid start index.
        """
        with open(self.path + "_i", "rb") as f:
            if start != 0:
                f.seek(8 * start)

            if samples == -1:
                indices = np.frombuffer(f.read(), dtype=np.uint64)
            else:
                indices = np.frombuffer(
                    f.read((samples + 1) * 8), dtype=np.uint64)

        if len(indices) < 2:
            raise ValueError(f"Could not read indices: {self.path}_i.")

        data = []
        with open(self.path, 'rb') as f:
            f.seek(int(indices[0]), 0)
            for left, right in zip(indices[:-1], indices[1:]):
                decompressed = lzma.decompress(f.read(right - left))
                data.append(self.buffer_to_array(decompressed, final=False))

        return np.concatenate(data, axis=0)

    def write(
        self, data: Data, append: bool = False, preset: int = 0
    ) -> None:
        """Write data.

        Args:
            data: data to write.
            preset: LZMA compression preset (0-9, 0 is fastest, 9 is best).
            append: only `append=False` is allowed.

        Raises:
            ValueError: data type/shape does not match channel specifications.
        """
        if not isinstance(data, np.ndarray):
            raise ValueError("LzmaFrame does not support writing raw data.")

        self._verify_type(data)
        if append:
            raise ValueError("Only overwriting is currently supported.")
        self.consume(data, preset=preset)

    def stream(
        self, transform: Callable[
            [Shaped[np.ndarray, "..."]], Any] | None = None, batch: int = 0
    ) -> Iterator[np.ndarray]:
        """Get iterable data stream.

        Args:
            transform: callable to apply to the read data. Should take a single
                sample or batch of samples, and can return an arbitrary type.
            batch: batch size to read. If 0, load only a single sample and do
                not append an empty axis.

        Returns:
            Iterator which yields successive frames.
        """
        if transform is None:
            transform = lambda x: x

        with open(self.path + "_i", "rb") as f:
            indices = np.frombuffer(f.read(), dtype=np.uint64)

        frames: list[np.ndarray] = []
        with open(self.path, 'rb') as f:
            for left, right in zip(indices[:-1], indices[1:]):
                if batch != 0 and len(frames) == batch:
                    yield transform(np.concatenate(frames, axis=0))
                    frames = []

                decompressed = self.buffer_to_array(
                    lzma.decompress(f.read(right - left)))
                if batch == 0:
                    yield transform(decompressed[0])
                else:
                    frames.append(decompressed)

        if len(frames) > 0:
            yield transform(np.concatenate(frames, axis=0))

    def _consume(self, stream: Iterable[list[bytes]]) -> None:
        """Consume pre-compressed stream."""
        main = open(self.path, 'wb')
        offsets = open(self.path + "_i", 'wb')
        tail = 0
        offsets.write(np.array(tail, dtype=np.uint64).tobytes())

        for frame in stream:
            for x in frame:
                main.write(x)
                tail += len(x)
                offsets.write(np.array(tail, dtype=np.uint64).tobytes())

        main.close()
        offsets.close()

    def consume(
        self, stream: Streamable[Data | Sequence[Data]],
        thread: bool = False, preset: int = 0, batch: int = 8
    ) -> None:
        """Consume iterable or queue and write to file.

        - If `Iterable`, fetches from the iterator until exhausted (i.e. until
          `StopIteration`), then returns.
        - If `Queue`, `.get()` from the `Queue` until `None` is received, then
          return.

        Args:
            stream: stream to consume; possibly already batched (see `batch`).
            thread: whether to return immediately, and run in a separate thread
                instead of returning immediately.
            preset: lzma compression preset to use.
            batch: aggregate, then batch this many lzma compressions in
                parallel. Necessary for throughput reasons, since lzma
                is only single (?) threaded. If `batch=0`, we assume that
                the input stream is already batched.

        Raises:
            ValueError: data type/shape does not match channel specifications.
        """
        if isinstance(stream, Queue):
            stream = Buffer(stream)
        # => stream is Iterator | Iterable

        if thread:
            Thread(target=self.consume, kwargs={
                "stream": stream, "preset": preset, "batch": batch
            }).start()
            return

        if batch != 0:
            stream_not_batched = cast(Iterable[Data] | Iterator[Data], stream)
            stream = batch_iterator(stream_not_batched, size=batch)

        def _compress_batch(data: Data | Sequence[Data]):
            self._verify_type(data)
            if not isinstance(data[0], np.ndarray):
                raise ValueError(
                    "LzmaFrame requires in put data to be a `np.ndarray`. "
                    "In particular, raw data (bytes) is not allowed.")

            with ThreadPool(processes=len(data)) as p:
                return p.map(
                    lambda x: lzma.compress(x.data, preset=preset),
                    cast(Sequence[np.ndarray], data))

        self._consume(Prefetch(
            _compress_batch(x) for x in
            cast(Iterable[Sequence[Data]] | Iterator[Sequence[Data]], stream)))

consume

consume(
    stream: Streamable[Data | Sequence[Data]],
    thread: bool = False,
    preset: int = 0,
    batch: int = 8,
) -> None

Consume iterable or queue and write to file.

  • If Iterable, fetches from the iterator until exhausted (i.e. until StopIteration), then returns.
  • If Queue, .get() from the Queue until None is received, then return.

Parameters:

Name Type Description Default
stream Streamable[Data | Sequence[Data]]

stream to consume; possibly already batched (see batch).

required
thread bool

whether to return immediately, and run in a separate thread instead of returning immediately.

False
preset int

lzma compression preset to use.

0
batch int

aggregate, then batch this many lzma compressions in parallel. Necessary for throughput reasons, since lzma is only single (?) threaded. If batch=0, we assume that the input stream is already batched.

8

Raises:

Type Description
ValueError

data type/shape does not match channel specifications.

Source code in format/src/roverd/channels/lzma.py
def consume(
    self, stream: Streamable[Data | Sequence[Data]],
    thread: bool = False, preset: int = 0, batch: int = 8
) -> None:
    """Consume iterable or queue and write to file.

    - If `Iterable`, fetches from the iterator until exhausted (i.e. until
      `StopIteration`), then returns.
    - If `Queue`, `.get()` from the `Queue` until `None` is received, then
      return.

    Args:
        stream: stream to consume; possibly already batched (see `batch`).
        thread: whether to return immediately, and run in a separate thread
            instead of returning immediately.
        preset: lzma compression preset to use.
        batch: aggregate, then batch this many lzma compressions in
            parallel. Necessary for throughput reasons, since lzma
            is only single (?) threaded. If `batch=0`, we assume that
            the input stream is already batched.

    Raises:
        ValueError: data type/shape does not match channel specifications.
    """
    if isinstance(stream, Queue):
        stream = Buffer(stream)
    # => stream is Iterator | Iterable

    if thread:
        Thread(target=self.consume, kwargs={
            "stream": stream, "preset": preset, "batch": batch
        }).start()
        return

    if batch != 0:
        stream_not_batched = cast(Iterable[Data] | Iterator[Data], stream)
        stream = batch_iterator(stream_not_batched, size=batch)

    def _compress_batch(data: Data | Sequence[Data]):
        self._verify_type(data)
        if not isinstance(data[0], np.ndarray):
            raise ValueError(
                "LzmaFrame requires in put data to be a `np.ndarray`. "
                "In particular, raw data (bytes) is not allowed.")

        with ThreadPool(processes=len(data)) as p:
            return p.map(
                lambda x: lzma.compress(x.data, preset=preset),
                cast(Sequence[np.ndarray], data))

    self._consume(Prefetch(
        _compress_batch(x) for x in
        cast(Iterable[Sequence[Data]] | Iterator[Sequence[Data]], stream)))

read

read(
    start: int | integer = 0, samples: int | integer = -1
) -> Shaped[ndarray, "samples ..."]

Read data.

Parameters:

Name Type Description Default
start int | integer

start index to read.

0
samples int | integer

number of samples/frames to read. If -1, read all data.

-1

Returns:

Type Description
Shaped[ndarray, 'samples ...']

Read frames as an array, with a leading axis corresponding to the number of samples. If only a subset of frames are readable (e.g. due to reaching the end of the video), the result is truncated.

Raises:

Type Description
ValueError

None of the frames could be read, possibly due to an invalid video, or invalid start index.

Source code in format/src/roverd/channels/lzma.py
def read(
    self, start: int | np.integer = 0, samples: int | np.integer = -1
) -> Shaped[np.ndarray, "samples ..."]:
    """Read data.

    Args:
        start: start index to read.
        samples: number of samples/frames to read. If `-1`, read all data.

    Returns:
        Read frames as an array, with a leading axis corresponding to
            the number of `samples`. If only a subset of frames are
            readable (e.g. due to reaching the end of the video), the
            result is truncated.

    Raises:
        ValueError: None of the frames could be read, possibly due to
            an invalid video, or invalid start index.
    """
    with open(self.path + "_i", "rb") as f:
        if start != 0:
            f.seek(8 * start)

        if samples == -1:
            indices = np.frombuffer(f.read(), dtype=np.uint64)
        else:
            indices = np.frombuffer(
                f.read((samples + 1) * 8), dtype=np.uint64)

    if len(indices) < 2:
        raise ValueError(f"Could not read indices: {self.path}_i.")

    data = []
    with open(self.path, 'rb') as f:
        f.seek(int(indices[0]), 0)
        for left, right in zip(indices[:-1], indices[1:]):
            decompressed = lzma.decompress(f.read(right - left))
            data.append(self.buffer_to_array(decompressed, final=False))

    return np.concatenate(data, axis=0)

stream

stream(
    transform: Callable[[Shaped[ndarray, ...]], Any] | None = None,
    batch: int = 0,
) -> Iterator[ndarray]

Get iterable data stream.

Parameters:

Name Type Description Default
transform Callable[[Shaped[ndarray, ...]], Any] | None

callable to apply to the read data. Should take a single sample or batch of samples, and can return an arbitrary type.

None
batch int

batch size to read. If 0, load only a single sample and do not append an empty axis.

0

Returns:

Type Description
Iterator[ndarray]

Iterator which yields successive frames.

Source code in format/src/roverd/channels/lzma.py
def stream(
    self, transform: Callable[
        [Shaped[np.ndarray, "..."]], Any] | None = None, batch: int = 0
) -> Iterator[np.ndarray]:
    """Get iterable data stream.

    Args:
        transform: callable to apply to the read data. Should take a single
            sample or batch of samples, and can return an arbitrary type.
        batch: batch size to read. If 0, load only a single sample and do
            not append an empty axis.

    Returns:
        Iterator which yields successive frames.
    """
    if transform is None:
        transform = lambda x: x

    with open(self.path + "_i", "rb") as f:
        indices = np.frombuffer(f.read(), dtype=np.uint64)

    frames: list[np.ndarray] = []
    with open(self.path, 'rb') as f:
        for left, right in zip(indices[:-1], indices[1:]):
            if batch != 0 and len(frames) == batch:
                yield transform(np.concatenate(frames, axis=0))
                frames = []

            decompressed = self.buffer_to_array(
                lzma.decompress(f.read(right - left)))
            if batch == 0:
                yield transform(decompressed[0])
            else:
                frames.append(decompressed)

    if len(frames) > 0:
        yield transform(np.concatenate(frames, axis=0))

write

write(data: Data, append: bool = False, preset: int = 0) -> None

Write data.

Parameters:

Name Type Description Default
data Data

data to write.

required
preset int

LZMA compression preset (0-9, 0 is fastest, 9 is best).

0
append bool

only append=False is allowed.

False

Raises:

Type Description
ValueError

data type/shape does not match channel specifications.

Source code in format/src/roverd/channels/lzma.py
def write(
    self, data: Data, append: bool = False, preset: int = 0
) -> None:
    """Write data.

    Args:
        data: data to write.
        preset: LZMA compression preset (0-9, 0 is fastest, 9 is best).
        append: only `append=False` is allowed.

    Raises:
        ValueError: data type/shape does not match channel specifications.
    """
    if not isinstance(data, np.ndarray):
        raise ValueError("LzmaFrame does not support writing raw data.")

    self._verify_type(data)
    if append:
        raise ValueError("Only overwriting is currently supported.")
    self.consume(data, preset=preset)

roverd.channels.NPZBlobChannel

Bases: BlobChannel

Blob channel consisting of .npz files.

Parameters:

Name Type Description Default
path str

file path.

required
dtype str | type | dtype

data type, or string name of dtype (e.g. u1, f4).

required
shape Sequence[int]

data shape.

required
workers int

maximum number of worker threads to use for I/O.

8
length int | None

number of blobs, potentially calculated some more efficient way. If None, will be calculated by counting files in the directory.

None
compress bool

whether to use compression when writing .npz files.

False

Attributes:

Name Type Description
path str

file path.

type dtype

numpy data type.

shape tuple[int, ...]

sample data shape.

size int

total file size, in bytes.

Source code in format/src/roverd/channels/blob.py
class NPZBlobChannel(BlobChannel):
    """Blob channel consisting of `.npz` files.

    Args:
        path: file path.
        dtype: data type, or string name of dtype (e.g. `u1`, `f4`).
        shape: data shape.
        workers: maximum number of worker threads to use for I/O.
        length: number of blobs, potentially calculated some more efficient
            way. If `None`, will be calculated by counting files in the
            directory.
        compress: whether to use compression when writing `.npz` files.

    Attributes:
        path: file path.
        type: numpy data type.
        shape: sample data shape.
        size: total file size, in bytes.
    """

    def __init__(
        self, path: str, dtype: str | type | np.dtype, shape: Sequence[int],
        workers: int = 8, length: int | None = None, compress: bool = False
    ) -> None:
        super().__init__(path, dtype, shape, workers, length)
        self.compress = compress

    def _read_blob(self, index: int) -> np.ndarray:
        """Load a blob from file.

        Args:
            index: index of the blob to load.

        Returns:
            The loaded blob as a numpy array.
        """
        filename = self._filename(index) + ".npz"
        if not os.path.exists(filename):
            raise IndexError(f"Blob index {index} does not exist.")

        with np.load(filename) as data:
            return data['data']

    def _write_blob(self, index: int, data: np.ndarray) -> None:
        """Write a blob to a file.

        Args:
            index: index of the blob to load.
            data: data to write.
        """
        filename = self._filename(index) + ".npz"
        if self.compress:
            np.savez_compressed(filename, data=data)
        else:
            np.savez(filename, data=data)

consume

consume(
    stream: Streamable[Data | Sequence[Data]],
    thread: bool = False,
    append: bool = False,
) -> None

Consume iterable or queue and write to file.

  • If Iterable, fetches from the iterator until exhausted (i.e. until StopIteration), then returns.
  • If Queue, .get() from the Queue until None is received, then return.

Parameters:

Name Type Description Default
stream Streamable[Data | Sequence[Data]]

stream to consume.

required
thread bool

whether to return immediately, and run in a separate thread instead of returning immediately.

False
append bool

whether to append or overwrite existing blobs.

False

Raises:

Type Description
ValueError

data type/shape does not match channel specifications.

Source code in format/src/roverd/channels/abstract.py
def consume(
    self, stream: Streamable[Data | Sequence[Data]], thread: bool = False,
    append: bool = False
) -> None:
    """Consume iterable or queue and write to file.

    - If `Iterable`, fetches from the iterator until exhausted (i.e. until
      `StopIteration`), then returns.
    - If `Queue`, `.get()` from the `Queue` until `None` is received, then
      return.

    Args:
        stream: stream to consume.
        thread: whether to return immediately, and run in a separate thread
            instead of returning immediately.
        append: whether to append or overwrite existing blobs.

    Raises:
        ValueError: data type/shape does not match channel specifications.
    """
    if isinstance(stream, Queue):
        stream = Buffer(stream)
    if thread:
        Thread(target=self.consume, kwargs={"stream": stream}).start()
        return

    if not append:
        self._n_blobs = 0

    os.makedirs(self.path, exist_ok=True)
    for frame in stream:
        if not isinstance(frame, np.ndarray):
            raise ValueError("BlobChannels do not allow raw data.")
        if len(frame.shape) != len(self.shape):
            raise ValueError(
                "BlobChannels do not allow batched write data.")

        self._verify_type(frame)
        self._write_blob(self._n_blobs, frame)
        self._n_blobs += 1

read

read(
    start: int | integer = 0, samples: int | integer = -1
) -> Shaped[ndarray, ...]

Read data.

Parameters:

Name Type Description Default
start int | integer

start index to read.

0
samples int | integer

number of samples/frames to read. If -1, read all data.

-1

Returns:

Type Description
Shaped[ndarray, ...]

Read frames as an array, with a leading axis corresponding to the number of samples.

Source code in format/src/roverd/channels/abstract.py
def read(
    self, start: int | np.integer = 0, samples: int | np.integer = -1
) -> Shaped[np.ndarray, "..."]:
    """Read data.

    Args:
        start: start index to read.
        samples: number of samples/frames to read. If `-1`, read all data.

    Returns:
        Read frames as an array, with a leading axis corresponding to
            the number of `samples`.
    """
    if samples == -1:
        samples = self._n_blobs - start

    if start < 0 or start + samples > self._n_blobs:
        raise IndexError(
            f"Read indices ({start, start + samples}) out of range "
            f"(0, {self._n_blobs}).")

    with ThreadPool(int(min(self.workers, samples))) as pool:
        blobs = pool.map(self._read_blob, range(start, start + samples))

    return np.stack(blobs, axis=0)

stream

stream(
    transform: Callable[[Shaped[ndarray, ...]], Any] | None = None,
    batch: int = 0,
) -> Iterator[Shaped[ndarray, ...]]

Get iterable data stream.

Parameters:

Name Type Description Default
transform Callable[[Shaped[ndarray, ...]], Any] | None

callable to apply to the read data. Should take a single sample or batch of samples, and can return an arbitrary type.

None
batch int

batch size to read. If 0, load only a single sample and do not append an empty axis.

0

Returns:

Type Description
Iterator[Shaped[ndarray, ...]]

Iterator which yields successive frames.

Source code in format/src/roverd/channels/abstract.py
def stream(
    self, transform: Callable[
        [Shaped[np.ndarray, "..."]], Any] | None = None, batch: int = 0
) -> Iterator[Shaped[np.ndarray, "..."]]:
    """Get iterable data stream.

    Args:
        transform: callable to apply to the read data. Should take a single
            sample or batch of samples, and can return an arbitrary type.
        batch: batch size to read. If 0, load only a single sample and do
            not append an empty axis.

    Returns:
        Iterator which yields successive frames.
    """
    if batch <= 0:
        # Single sample mode
        for i in range(self._n_blobs):
            data = self.read(i, 1)[0]  # Remove batch dimension
            if transform is not None:
                data = transform(data)
            yield data
    else:
        # Batch mode
        for start in range(0, self._n_blobs, batch):
            samples_to_read = min(batch, self._n_blobs - start)
            data = self.read(start, samples_to_read)
            if transform is not None:
                data = transform(data)
            yield data

write

write(data: Data, append: bool = False) -> None

Write data.

Parameters:

Name Type Description Default
data Data

data to write, with leading axis corresponding to the number of samples/frames.

required
append bool

append is currently not implemented for blob channels.

False

Raises:

Type Description
ValueError

data type/shape does not match channel specifications.

Source code in format/src/roverd/channels/abstract.py
def write(self, data: Data, append: bool = False) -> None:
    """Write data.

    Args:
        data: data to write, with leading axis corresponding to the number
            of samples/frames.
        append: append is currently not implemented for blob channels.

    Raises:
        ValueError: data type/shape does not match channel specifications.
    """
    assert append is False, "Append is not implemented."
    if not isinstance(data, np.ndarray):
        raise ValueError("BlobChannels do not allow raw data.")
    if len(data.shape) != len(self.shape) + 1:
        raise ValueError(
            f"Data shape {data.shape} does not match channel shape "
            f"{self.shape}.")

    os.makedirs(self.path, exist_ok=True)
    with ThreadPool(self.workers) as pool:
        pool.map(
            lambda i: self._write_blob(self._n_blobs + i, data[i]),
            range(data.shape[0]))

    self._n_blobs = data.shape[0]

roverd.channels.RawChannel

Bases: Channel

Raw (uncompressed) data.

Parameters:

Name Type Description Default
path str

file path.

required
dtype str | type | dtype

data type, or string name of dtype (e.g. u1, f4).

required
shape Sequence[int]

data shape.

required

Attributes:

Name Type Description
path str

file path.

type dtype

numpy data type.

shape tuple[int, ...]

sample data shape.

size int

total file size, in bytes.

Source code in format/src/roverd/channels/raw.py
class RawChannel(Channel):
    """Raw (uncompressed) data.

    Args:
        path: file path.
        dtype: data type, or string name of dtype (e.g. `u1`, `f4`).
        shape: data shape.

    Attributes:
        path: file path.
        type: numpy data type.
        shape: sample data shape.
        size: total file size, in bytes.
    """

    @staticmethod
    def _open_r(path: str) -> io.BufferedIOBase:
        return open(path, 'rb')

    @staticmethod
    def _open_w(path, append: bool = False) -> io.BufferedIOBase:
        return open(path, 'ab' if append else 'wb')

    def read(
        self, start: int | np.integer = 0, samples: int | np.integer = -1
    ) -> Shaped[np.ndarray, "..."]:
        """Read data.

        !!! info

            We read through `bytearray -> memoryview -> np.frombuffer` to
            provide a read-write buffer without requiring an additional copy.
            This is required for full functionality in downstream applications,
            e.g. [`torch.from_numpy`](
            https://docs.pytorch.org/docs/stable/generated/torch.from_numpy.html).

            Note that this is valid since the bytearray is not returned, so
            ownership is passed to the returned numpy array.

        Args:
            start: start index to read.
            samples: number of samples/frames to read. If `-1`, read all data.

        Returns:
            Read frames as an array, with a leading axis corresponding to
                the number of `samples`.
        """
        with self._open_r(self.path) as f:
            if start > 0:
                f.seek(self.size * start, 0)

            size = self.filesize if samples == -1 else samples * self.size
            buf = bytearray(size)
            f.readinto(memoryview(buf))

        return self.buffer_to_array(buf, batch=True)

    def write(self, data: Data, append: bool = False) -> None:
        """Write data.

        Args:
            data: data to write.
            append: if `True`, append to the file instead of overwriting it.

        Raises:
            ValueError: data type/shape does not match channel specifications.
        """
        self._verify_type(data)
        with self._open_w(self.path, append) as f:
            f.write(self._serialize(data))

    def stream(
        self, transform: Callable[
            [Shaped[np.ndarray, "..."]], Any] | None = None, batch: int = 0
    ) -> Iterator[Shaped[np.ndarray, "..."]]:
        """Get iterable data stream.

        Args:
            transform: callable to apply to the read data. Should take a single
                sample or batch of samples, and can return an arbitrary type.
            batch: batch size to read. If 0, load only a single sample and do
                not append an empty axis.

        Returns:
            Iterator which yields successive frames.
        """
        size = self.size if batch == 0 else batch * self.size

        if transform is None:
            transform = lambda x: x

        with self._open_r(self.path) as fp:
            while True:
                data = cast(bytes, fp.read(size))
                if len(data) < size:
                    fp.close()
                    partial_batch = len(data) // self.size
                    if partial_batch > 0:
                        yield transform(self.buffer_to_array(
                            data[:partial_batch * size], batch=(batch != 0)))
                    return
                yield transform(self.buffer_to_array(data, batch=(batch != 0)))

    def consume(
        self, stream: Streamable[Data | Sequence[Data]], thread: bool = False
    ) -> None:
        """Consume iterable or queue and write to file.

        - If `Iterable`, fetches from the iterator until exhausted (i.e. until
          `StopIteration`), then returns.
        - If `Queue`, `.get()` from the `Queue` until `None` is received, then
          return.

        Args:
            stream: stream to consume.
            thread: if `True`, return immediately, and run in a separate thread
                instead of returning immediately.

        Raises:
            ValueError: data type/shape does not match channel specifications.
        """
        if isinstance(stream, Queue):
            stream = Buffer(stream)
        if thread:
            Thread(target=self.consume, kwargs={"stream": stream}).start()
            return

        with self._open_w(self.path, append=False) as f:
            for data in stream:
                self._verify_type(data)
                f.write(self._serialize(data))

    def memmap(self) -> np.memmap:
        """Open memory mapped array."""
        return np.memmap(
            self.path, dtype=self.type, mode='r',
            shape=(self.filesize // self.size, *self.shape))

consume

consume(
    stream: Streamable[Data | Sequence[Data]], thread: bool = False
) -> None

Consume iterable or queue and write to file.

  • If Iterable, fetches from the iterator until exhausted (i.e. until StopIteration), then returns.
  • If Queue, .get() from the Queue until None is received, then return.

Parameters:

Name Type Description Default
stream Streamable[Data | Sequence[Data]]

stream to consume.

required
thread bool

if True, return immediately, and run in a separate thread instead of returning immediately.

False

Raises:

Type Description
ValueError

data type/shape does not match channel specifications.

Source code in format/src/roverd/channels/raw.py
def consume(
    self, stream: Streamable[Data | Sequence[Data]], thread: bool = False
) -> None:
    """Consume iterable or queue and write to file.

    - If `Iterable`, fetches from the iterator until exhausted (i.e. until
      `StopIteration`), then returns.
    - If `Queue`, `.get()` from the `Queue` until `None` is received, then
      return.

    Args:
        stream: stream to consume.
        thread: if `True`, return immediately, and run in a separate thread
            instead of returning immediately.

    Raises:
        ValueError: data type/shape does not match channel specifications.
    """
    if isinstance(stream, Queue):
        stream = Buffer(stream)
    if thread:
        Thread(target=self.consume, kwargs={"stream": stream}).start()
        return

    with self._open_w(self.path, append=False) as f:
        for data in stream:
            self._verify_type(data)
            f.write(self._serialize(data))

memmap

memmap() -> memmap

Open memory mapped array.

Source code in format/src/roverd/channels/raw.py
def memmap(self) -> np.memmap:
    """Open memory mapped array."""
    return np.memmap(
        self.path, dtype=self.type, mode='r',
        shape=(self.filesize // self.size, *self.shape))

read

read(
    start: int | integer = 0, samples: int | integer = -1
) -> Shaped[ndarray, ...]

Read data.

Info

We read through bytearray -> memoryview -> np.frombuffer to provide a read-write buffer without requiring an additional copy. This is required for full functionality in downstream applications, e.g. torch.from_numpy.

Note that this is valid since the bytearray is not returned, so ownership is passed to the returned numpy array.

Parameters:

Name Type Description Default
start int | integer

start index to read.

0
samples int | integer

number of samples/frames to read. If -1, read all data.

-1

Returns:

Type Description
Shaped[ndarray, ...]

Read frames as an array, with a leading axis corresponding to the number of samples.

Source code in format/src/roverd/channels/raw.py
def read(
    self, start: int | np.integer = 0, samples: int | np.integer = -1
) -> Shaped[np.ndarray, "..."]:
    """Read data.

    !!! info

        We read through `bytearray -> memoryview -> np.frombuffer` to
        provide a read-write buffer without requiring an additional copy.
        This is required for full functionality in downstream applications,
        e.g. [`torch.from_numpy`](
        https://docs.pytorch.org/docs/stable/generated/torch.from_numpy.html).

        Note that this is valid since the bytearray is not returned, so
        ownership is passed to the returned numpy array.

    Args:
        start: start index to read.
        samples: number of samples/frames to read. If `-1`, read all data.

    Returns:
        Read frames as an array, with a leading axis corresponding to
            the number of `samples`.
    """
    with self._open_r(self.path) as f:
        if start > 0:
            f.seek(self.size * start, 0)

        size = self.filesize if samples == -1 else samples * self.size
        buf = bytearray(size)
        f.readinto(memoryview(buf))

    return self.buffer_to_array(buf, batch=True)

stream

stream(
    transform: Callable[[Shaped[ndarray, ...]], Any] | None = None,
    batch: int = 0,
) -> Iterator[Shaped[ndarray, ...]]

Get iterable data stream.

Parameters:

Name Type Description Default
transform Callable[[Shaped[ndarray, ...]], Any] | None

callable to apply to the read data. Should take a single sample or batch of samples, and can return an arbitrary type.

None
batch int

batch size to read. If 0, load only a single sample and do not append an empty axis.

0

Returns:

Type Description
Iterator[Shaped[ndarray, ...]]

Iterator which yields successive frames.

Source code in format/src/roverd/channels/raw.py
def stream(
    self, transform: Callable[
        [Shaped[np.ndarray, "..."]], Any] | None = None, batch: int = 0
) -> Iterator[Shaped[np.ndarray, "..."]]:
    """Get iterable data stream.

    Args:
        transform: callable to apply to the read data. Should take a single
            sample or batch of samples, and can return an arbitrary type.
        batch: batch size to read. If 0, load only a single sample and do
            not append an empty axis.

    Returns:
        Iterator which yields successive frames.
    """
    size = self.size if batch == 0 else batch * self.size

    if transform is None:
        transform = lambda x: x

    with self._open_r(self.path) as fp:
        while True:
            data = cast(bytes, fp.read(size))
            if len(data) < size:
                fp.close()
                partial_batch = len(data) // self.size
                if partial_batch > 0:
                    yield transform(self.buffer_to_array(
                        data[:partial_batch * size], batch=(batch != 0)))
                return
            yield transform(self.buffer_to_array(data, batch=(batch != 0)))

write

write(data: Data, append: bool = False) -> None

Write data.

Parameters:

Name Type Description Default
data Data

data to write.

required
append bool

if True, append to the file instead of overwriting it.

False

Raises:

Type Description
ValueError

data type/shape does not match channel specifications.

Source code in format/src/roverd/channels/raw.py
def write(self, data: Data, append: bool = False) -> None:
    """Write data.

    Args:
        data: data to write.
        append: if `True`, append to the file instead of overwriting it.

    Raises:
        ValueError: data type/shape does not match channel specifications.
    """
    self._verify_type(data)
    with self._open_w(self.path, append) as f:
        f.write(self._serialize(data))

roverd.channels.VideoChannel

Bases: Channel

Video data.

Warning

While opencv is a heavy dependency, it seems to have very efficient seeking for mjpeg compared to imageio, the other library that I tested. Using opencv-python-headless instead of the default opencv should alleviate some of these issues.

Parameters:

Name Type Description Default
path str

file path.

required
dtype str | type | dtype

data type, or string name of dtype (e.g. u1, f4).

required
shape Sequence[int]

data shape.

required

Attributes:

Name Type Description
path str

file path.

type dtype

numpy data type.

shape tuple[int, ...]

sample data shape.

size int

total file size, in bytes.

Source code in format/src/roverd/channels/video.py
class VideoChannel(Channel):
    """Video data.

    !!! warning

        While opencv is a heavy dependency, it seems to have very efficient
        seeking for mjpeg compared to `imageio`, the other library that I
        tested. Using `opencv-python-headless` instead of the default opencv
        should alleviate some of these issues.

    Args:
        path: file path.
        dtype: data type, or string name of dtype (e.g. `u1`, `f4`).
        shape: data shape.

    Attributes:
        path: file path.
        type: numpy data type.
        shape: sample data shape.
        size: total file size, in bytes.
    """

    @cached_property
    def _cv2_module(self):
        try:
            import cv2
            return cv2
        except ImportError:
            raise ImportError(
                "Could not import cv2. `opencv-python` or "
                "`opencv-python-headless` must be installed in order to use "
                "video encoding or decoding.")

    def read(
        self, start: int | np.integer = 0, samples: int | np.integer = -1
    ) -> Shaped[np.ndarray, "samples ..."]:
        """Read data.

        Args:
            start: start index to read.
            samples: number of samples/frames to read. If `-1`, read all data.

        Returns:
            Read frames as an array, with a leading axis corresponding to
                the number of `samples`. If only a subset of frames are
                readable (e.g. due to reaching the end of the video), the
                result is truncated.

        Raises:
            ValueError: None of the frames could be read, possibly due to
                an invalid video, or invalid start index.
        """
        cap = self._cv2_module.VideoCapture(self.path)

        if start != 0:
            cap.set(self._cv2_module.CAP_PROP_POS_FRAMES, start - 1)

        frames: list[np.ndarray] = []
        while cap.isOpened():
            ret, frame = cap.read()
            # if `samples == -1`, this is never satisfied.
            if ret and len(frames) != samples:
                frames.append(frame[..., ::-1])
            else:
                break

        cap.release()
        if len(frames) == 0:
            raise ValueError("Could not read any frames.")
        return np.stack(frames)

    def stream(
        self, transform: Callable[
            [Shaped[np.ndarray, "..."]], Any] | None = None, batch: int = 0
    ) -> Iterator[np.ndarray]:
        """Get iterable data stream.

        Args:
            transform: callable to apply to the read data. Should take a single
                sample or batch of samples, and can return an arbitrary type.
            batch: batch size to read. If 0, load only a single sample and do
                not append an empty axis.

        Returns:
            Iterator which yields successive frames.
        """
        if transform is None:
            transform = lambda x: x

        cap = self._cv2_module.VideoCapture(self.path)
        frames: list[np.ndarray] = []
        while cap.isOpened():
            if batch != 0 and len(frames) == batch:
                yield transform(np.stack(frames))
                frames = []

            ret, frame = cap.read()
            if ret:
                if batch == 0:
                    yield transform(frame[..., ::-1])
                else:
                    frames.append(frame[..., ::-1])
            else:
                break

        if len(frames) > 0:
            yield transform(np.stack(frames))

        cap.release()
        return

    def write(
        self, data: Data, append: bool = False
    ) -> None:
        """Write data.

        Args:
            data: data to write.
            append: if `True`, append to the file instead of overwriting it.

        Raises:
            ValueError: data type/shape does not match channel specifications.
        """
        assert isinstance(data, np.ndarray)
        self.consume(frame for frame in data)

    def consume(
        self, stream: Streamable[Data | Sequence[Data]],
        thread: bool = False, fps: float = 10.0
    ) -> None:
        """Consume iterable or queue and write to file.

        - If `Iterable`, fetches from the iterator until exhausted (i.e. until
          `StopIteration`), then returns.
        - If `Queue`, `.get()` from the `Queue` until `None` is received, then
          return.

        Args:
            stream: stream to consume.
            fps: video framerate.
            thread: whether to return immediately, and run in a separate thread
                instead of returning immediately.

        Raises:
            ValueError: data type/shape does not match channel specifications.
        """
        if isinstance(stream, Queue):
            stream = cast(Iterable[Data], Buffer(stream))
        if thread:
            Thread(
                target=self.consume, kwargs={"stream": stream, "fps": fps}
            ).start()
            return

        fourcc = self._cv2_module.VideoWriter_fourcc(*'MJPG')  # type: ignore
        cap = self._cv2_module.VideoWriter(
            self.path, fourcc, fps, (self.shape[1], self.shape[0]))
        for frame in stream:
            if not isinstance(frame, np.ndarray):
                raise ValueError("VideoChannel does not allow raw data.")
            self._verify_type(frame)
            cap.write(self._cv2_module.cvtColor(
                frame, self._cv2_module.COLOR_RGB2BGR))
        cap.release()

consume

consume(
    stream: Streamable[Data | Sequence[Data]],
    thread: bool = False,
    fps: float = 10.0,
) -> None

Consume iterable or queue and write to file.

  • If Iterable, fetches from the iterator until exhausted (i.e. until StopIteration), then returns.
  • If Queue, .get() from the Queue until None is received, then return.

Parameters:

Name Type Description Default
stream Streamable[Data | Sequence[Data]]

stream to consume.

required
fps float

video framerate.

10.0
thread bool

whether to return immediately, and run in a separate thread instead of returning immediately.

False

Raises:

Type Description
ValueError

data type/shape does not match channel specifications.

Source code in format/src/roverd/channels/video.py
def consume(
    self, stream: Streamable[Data | Sequence[Data]],
    thread: bool = False, fps: float = 10.0
) -> None:
    """Consume iterable or queue and write to file.

    - If `Iterable`, fetches from the iterator until exhausted (i.e. until
      `StopIteration`), then returns.
    - If `Queue`, `.get()` from the `Queue` until `None` is received, then
      return.

    Args:
        stream: stream to consume.
        fps: video framerate.
        thread: whether to return immediately, and run in a separate thread
            instead of returning immediately.

    Raises:
        ValueError: data type/shape does not match channel specifications.
    """
    if isinstance(stream, Queue):
        stream = cast(Iterable[Data], Buffer(stream))
    if thread:
        Thread(
            target=self.consume, kwargs={"stream": stream, "fps": fps}
        ).start()
        return

    fourcc = self._cv2_module.VideoWriter_fourcc(*'MJPG')  # type: ignore
    cap = self._cv2_module.VideoWriter(
        self.path, fourcc, fps, (self.shape[1], self.shape[0]))
    for frame in stream:
        if not isinstance(frame, np.ndarray):
            raise ValueError("VideoChannel does not allow raw data.")
        self._verify_type(frame)
        cap.write(self._cv2_module.cvtColor(
            frame, self._cv2_module.COLOR_RGB2BGR))
    cap.release()

read

read(
    start: int | integer = 0, samples: int | integer = -1
) -> Shaped[ndarray, "samples ..."]

Read data.

Parameters:

Name Type Description Default
start int | integer

start index to read.

0
samples int | integer

number of samples/frames to read. If -1, read all data.

-1

Returns:

Type Description
Shaped[ndarray, 'samples ...']

Read frames as an array, with a leading axis corresponding to the number of samples. If only a subset of frames are readable (e.g. due to reaching the end of the video), the result is truncated.

Raises:

Type Description
ValueError

None of the frames could be read, possibly due to an invalid video, or invalid start index.

Source code in format/src/roverd/channels/video.py
def read(
    self, start: int | np.integer = 0, samples: int | np.integer = -1
) -> Shaped[np.ndarray, "samples ..."]:
    """Read data.

    Args:
        start: start index to read.
        samples: number of samples/frames to read. If `-1`, read all data.

    Returns:
        Read frames as an array, with a leading axis corresponding to
            the number of `samples`. If only a subset of frames are
            readable (e.g. due to reaching the end of the video), the
            result is truncated.

    Raises:
        ValueError: None of the frames could be read, possibly due to
            an invalid video, or invalid start index.
    """
    cap = self._cv2_module.VideoCapture(self.path)

    if start != 0:
        cap.set(self._cv2_module.CAP_PROP_POS_FRAMES, start - 1)

    frames: list[np.ndarray] = []
    while cap.isOpened():
        ret, frame = cap.read()
        # if `samples == -1`, this is never satisfied.
        if ret and len(frames) != samples:
            frames.append(frame[..., ::-1])
        else:
            break

    cap.release()
    if len(frames) == 0:
        raise ValueError("Could not read any frames.")
    return np.stack(frames)

stream

stream(
    transform: Callable[[Shaped[ndarray, ...]], Any] | None = None,
    batch: int = 0,
) -> Iterator[ndarray]

Get iterable data stream.

Parameters:

Name Type Description Default
transform Callable[[Shaped[ndarray, ...]], Any] | None

callable to apply to the read data. Should take a single sample or batch of samples, and can return an arbitrary type.

None
batch int

batch size to read. If 0, load only a single sample and do not append an empty axis.

0

Returns:

Type Description
Iterator[ndarray]

Iterator which yields successive frames.

Source code in format/src/roverd/channels/video.py
def stream(
    self, transform: Callable[
        [Shaped[np.ndarray, "..."]], Any] | None = None, batch: int = 0
) -> Iterator[np.ndarray]:
    """Get iterable data stream.

    Args:
        transform: callable to apply to the read data. Should take a single
            sample or batch of samples, and can return an arbitrary type.
        batch: batch size to read. If 0, load only a single sample and do
            not append an empty axis.

    Returns:
        Iterator which yields successive frames.
    """
    if transform is None:
        transform = lambda x: x

    cap = self._cv2_module.VideoCapture(self.path)
    frames: list[np.ndarray] = []
    while cap.isOpened():
        if batch != 0 and len(frames) == batch:
            yield transform(np.stack(frames))
            frames = []

        ret, frame = cap.read()
        if ret:
            if batch == 0:
                yield transform(frame[..., ::-1])
            else:
                frames.append(frame[..., ::-1])
        else:
            break

    if len(frames) > 0:
        yield transform(np.stack(frames))

    cap.release()
    return

write

write(data: Data, append: bool = False) -> None

Write data.

Parameters:

Name Type Description Default
data Data

data to write.

required
append bool

if True, append to the file instead of overwriting it.

False

Raises:

Type Description
ValueError

data type/shape does not match channel specifications.

Source code in format/src/roverd/channels/video.py
def write(
    self, data: Data, append: bool = False
) -> None:
    """Write data.

    Args:
        data: data to write.
        append: if `True`, append to the file instead of overwriting it.

    Raises:
        ValueError: data type/shape does not match channel specifications.
    """
    assert isinstance(data, np.ndarray)
    self.consume(frame for frame in data)

roverd.channels.from_config

from_config(
    path: str,
    format: str,
    type: str,
    shape: Sequence[int],
    description: str | None = None,
    desc: str | None = None,
    args: dict = {},
) -> Channel

Create channel from configuration.

Parameters:

Name Type Description Default
path str

File path to the channel data.

required
format str

channel format name.

required
type str

data type, using numpy size-in-bytes convention (e.g. u2 for 2-byte/16-bit unsigned integer).

required
shape Sequence[int]

shape of the non-time dimensions.

required
args dict

other arguments to pass to the channel constructor.

{}

Returns:

Type Description
Channel

Initialized channel object.

Source code in format/src/roverd/channels/__init__.py
def from_config(  # noqa: D417
    path: str, format: str, type: str, shape: Sequence[int],
    description: str | None = None, desc: str | None = None,
    args: dict = {}
) -> Channel:
    """Create channel from configuration.

    Args:
        path: File path to the channel data.
        format: channel format name.
        type: data type, using numpy size-in-bytes convention (e.g. u2 for
            2-byte/16-bit unsigned integer).
        shape: shape of the non-time dimensions.
        args: other arguments to pass to the channel constructor.

    Returns:
        Initialized channel object.
    """
    ctype = CHANNEL_TYPES.get(format)
    if ctype is None:
        raise ValueError(f"Unknown channel format: {format}")
    return ctype(path=path, dtype=type, shape=shape, **args)