Skip to content

roverd

Roverd: data format and data loading.

ADL-Compliant

The roverd package implements a fully Abstract Dataloader-compliant map-style data loader.

Thus, to use the dataloader in practice, in addition to writing custom ADL-compliant components, you can use generic ADL components:

Fully Typed

The roverd dataloader is fully typed using generic dataclasses of jaxtyping arrays following the Abstract Dataloader's recommendations, and comes with a type library which describes the data types collected by the red-rover system.

To use roverd, you can either use the high level interfaces to load a complete dataset consisting of multiple traces, or use lower-level APIs to load a single trace, single sensor, or a single "channel" within a sensor.

  • High level APIs are generally preferred, and include descriptive types.
  • Lower level APIs should generally be avoided, but are required to modify the data; high-level APIs are intentionally read only.
>>> from roverd import Dataset, sensors
>>> from abstract_dataloader import generic
>>> dataset = Dataset.from_config(
        Dataset.find_traces("/data/grt"),
        sync=generic.Nearest("lidar", tol=0.1),
        sensors={"radar": sensors.XWRRadar, "lidar": sensors.OSLidar})
>>> dataset
Dataset(166 traces, n=1139028)
>>> dataset[42]
{'radar': XWRRadarIQ(...), 'lidar': OSLidarData(...)}
>>> from roverd import Trace, sensors
>>> from abstract_dataloader import generic
>>> trace = Trace.from_config(
        "/data/grt/bike/point.back",
        sync=generic.Nearest("lidar"),
        sensors={"radar": sensors.XWRRadar, "lidar": sensors.OSLidar})
>>> trace['radar']
XWRRadar(/data/grt/bike/point.back/radar: [ts, iq, valid])
>>> trace[42]
{'radar': XWRRadarIQ(...), 'lidar': OSLidarData(...)}
>>> from roverd import Trace
>>> from abstract_dataloader import generic
>>> trace = Trace.from_config(
        "/data/grt/bike/point.back", sync=generic.Nearest("lidar"))
>>> trace
Trace(/data/grt/bike/point.back, 12195x[radar, camera, lidar, imu])
>>> trace['radar']
DynamicSensor(/data/grt/bike/point.back/radar: [ts, iq, valid])
>>> trace[42]
{'radar': {...}, 'camera': {...}, 'lidar': {...}, ...}
>>> from roverd.sensors import XWRRadar
>>> radar = XWRRadar("/data/grt/bike/point.back/radar")
>>> radar
XWRRadar(/data/grt/bike/point.back/radar: [ts, iq, valid])
>>> len(radar)
24576
>>> radar[42]
XWRRadarIQ(
    iq=int16[1 64 3 4 512], timestamps=float64[1], valid=uint8[1],
    range_resolution=float32[1], doppler_resolution=float32[1])

roverd.Dataset

Bases: Dataset[TSample]

A dataset, consisting of multiple traces.

Type Parameters
  • Sample: sample data type which this Dataset returns. As a convention, we suggest returning "batched" data by default, i.e. with a leading singleton axis.

Parameters:

Name Type Description Default
traces Sequence[Trace[TSample]]

traces which make up this dataset; must be roverd traces!

required
Source code in format/src/roverd/trace.py
class Dataset(abstract.Dataset[TSample]):
    """A dataset, consisting of multiple traces.

    Type Parameters:
        - `Sample`: sample data type which this `Dataset` returns. As a
            convention, we suggest returning "batched" data by default, i.e.
            with a leading singleton axis.

    Args:
        traces: traces which make up this dataset; must be `roverd` traces!
    """

    def __init__(self, traces: Sequence[Trace[TSample]]) -> None:
        self.traces = traces

    @staticmethod
    def find_traces(
        *paths: str, follow_symlinks: bool = False
    ) -> list[str]:
        """Walk a directory (or list of directories) to find all datasets.

        Datasets are defined by directories containing a `config.yaml` file.

        !!! warning

            This method does not follow symlinks by default. If you have a
            cirular symlink, and `follow_symlinks=True`, this method will loop
            infinitely!

        Args:
            paths: a (list) of filepaths.
            follow_symlinks: whether to follow symlinks.
        """
        def _find(path) -> list[str]:
            if os.path.exists(os.path.join(path, "config.yaml")):
                return [path]
            else:
                contents = (
                    os.path.join(path, s.name) for s in os.scandir(path)
                    if s.is_dir(follow_symlinks=follow_symlinks))
                return sum((_find(c) for c in contents), start=[])

        return sum((_find(p) for p in paths), start=[])

    @classmethod
    def from_config(
        cls, paths: Sequence[str],
        sync: spec.Synchronization = generic.Empty(),
        sensors: Mapping[
            str, Callable[[str], Sensor] | str | None] | None = None,
        include_virtual: bool = False, workers: int = 0
    ) -> "Dataset":
        """Create a dataset from a list of directories containing recordings.

        Constructor arguments are forwarded to [`Trace.from_config`][^^.].

        !!! tip

            Set `workers=-1` to initialize all traces in parallel. This can
            greatly speed up initialization on highly distributed filesystems,
            e.g. blob stores!

        Args:
            paths: paths to trace directories.
            sync: synchronization protocol.
            sensors: sensor types to use.
            include_virtual: if `True`, include virtual sensors as well.
            workers: number of worker threads to use during initialization. If
                `=0`, load synchronously; if `<0`, load all traces in parallel.
        """
        if workers < 0:
            workers = len(paths)

        if workers == 0:
            traces = [
                Trace.from_config(
                    p, sync=sync, sensors=sensors,
                    include_virtual=include_virtual)
                for p in paths]
        else:
            with ThreadPool(workers) as pool:
                traces = pool.map(
                    lambda p: Trace.from_config(
                        p, sync=sync, sensors=sensors,
                        include_virtual=include_virtual),
                    paths)

        return cls(traces=cast(list[Trace[TSample]], traces))  # type: ignore

    def __getitem__(self, index: int | np.integer) -> TSample:
        """Fetch item from this dataset by global index.

        Args:
            index: sample index.

        Returns:
            loaded sample.

        Raises:
            IndexError: provided index is out of bounds.
        """
        # We just want to overwrite the docstring.
        return super().__getitem__(index)

    def __len__(self) -> int:
        """Total number of samples in this dataset."""
        return super().__len__()

__getitem__

__getitem__(index: int | integer) -> TSample

Fetch item from this dataset by global index.

Parameters:

Name Type Description Default
index int | integer

sample index.

required

Returns:

Type Description
TSample

loaded sample.

Raises:

Type Description
IndexError

provided index is out of bounds.

Source code in format/src/roverd/trace.py
def __getitem__(self, index: int | np.integer) -> TSample:
    """Fetch item from this dataset by global index.

    Args:
        index: sample index.

    Returns:
        loaded sample.

    Raises:
        IndexError: provided index is out of bounds.
    """
    # We just want to overwrite the docstring.
    return super().__getitem__(index)

__len__

__len__() -> int

Total number of samples in this dataset.

Source code in format/src/roverd/trace.py
def __len__(self) -> int:
    """Total number of samples in this dataset."""
    return super().__len__()

find_traces staticmethod

find_traces(*paths: str, follow_symlinks: bool = False) -> list[str]

Walk a directory (or list of directories) to find all datasets.

Datasets are defined by directories containing a config.yaml file.

Warning

This method does not follow symlinks by default. If you have a cirular symlink, and follow_symlinks=True, this method will loop infinitely!

Parameters:

Name Type Description Default
paths str

a (list) of filepaths.

()
follow_symlinks bool

whether to follow symlinks.

False
Source code in format/src/roverd/trace.py
@staticmethod
def find_traces(
    *paths: str, follow_symlinks: bool = False
) -> list[str]:
    """Walk a directory (or list of directories) to find all datasets.

    Datasets are defined by directories containing a `config.yaml` file.

    !!! warning

        This method does not follow symlinks by default. If you have a
        cirular symlink, and `follow_symlinks=True`, this method will loop
        infinitely!

    Args:
        paths: a (list) of filepaths.
        follow_symlinks: whether to follow symlinks.
    """
    def _find(path) -> list[str]:
        if os.path.exists(os.path.join(path, "config.yaml")):
            return [path]
        else:
            contents = (
                os.path.join(path, s.name) for s in os.scandir(path)
                if s.is_dir(follow_symlinks=follow_symlinks))
            return sum((_find(c) for c in contents), start=[])

    return sum((_find(p) for p in paths), start=[])

from_config classmethod

from_config(
    paths: Sequence[str],
    sync: Synchronization = Empty(),
    sensors: Mapping[str, Callable[[str], Sensor] | str | None] | None = None,
    include_virtual: bool = False,
    workers: int = 0,
) -> Dataset

Create a dataset from a list of directories containing recordings.

Constructor arguments are forwarded to Trace.from_config.

Tip

Set workers=-1 to initialize all traces in parallel. This can greatly speed up initialization on highly distributed filesystems, e.g. blob stores!

Parameters:

Name Type Description Default
paths Sequence[str]

paths to trace directories.

required
sync Synchronization

synchronization protocol.

Empty()
sensors Mapping[str, Callable[[str], Sensor] | str | None] | None

sensor types to use.

None
include_virtual bool

if True, include virtual sensors as well.

False
workers int

number of worker threads to use during initialization. If =0, load synchronously; if <0, load all traces in parallel.

0
Source code in format/src/roverd/trace.py
@classmethod
def from_config(
    cls, paths: Sequence[str],
    sync: spec.Synchronization = generic.Empty(),
    sensors: Mapping[
        str, Callable[[str], Sensor] | str | None] | None = None,
    include_virtual: bool = False, workers: int = 0
) -> "Dataset":
    """Create a dataset from a list of directories containing recordings.

    Constructor arguments are forwarded to [`Trace.from_config`][^^.].

    !!! tip

        Set `workers=-1` to initialize all traces in parallel. This can
        greatly speed up initialization on highly distributed filesystems,
        e.g. blob stores!

    Args:
        paths: paths to trace directories.
        sync: synchronization protocol.
        sensors: sensor types to use.
        include_virtual: if `True`, include virtual sensors as well.
        workers: number of worker threads to use during initialization. If
            `=0`, load synchronously; if `<0`, load all traces in parallel.
    """
    if workers < 0:
        workers = len(paths)

    if workers == 0:
        traces = [
            Trace.from_config(
                p, sync=sync, sensors=sensors,
                include_virtual=include_virtual)
            for p in paths]
    else:
        with ThreadPool(workers) as pool:
            traces = pool.map(
                lambda p: Trace.from_config(
                    p, sync=sync, sensors=sensors,
                    include_virtual=include_virtual),
                paths)

    return cls(traces=cast(list[Trace[TSample]], traces))  # type: ignore

roverd.Trace

Bases: Trace[TSample]

A single trace, containing multiple sensors.

Type Parameters
  • Sample: sample data type which this Sensor returns. As a convention, we suggest returning "batched" data by default, i.e. with a leading singleton axis.

Parameters:

Name Type Description Default
sensors Mapping[str, Sensor]

sensors which make up this trace.

required
sync Synchronization | Mapping[str, Integer[ndarray, N]] | None

synchronization protocol used to create global samples from asynchronous time series. If Mapping; the provided indices are used directly; if None, sensors are expected to already be synchronous (equivalent to passing {k: np.arange(N), ...}).

None
name str

friendly name; should only be used for debugging and inspection.

'trace'
Source code in format/src/roverd/trace.py
class Trace(abstract.Trace[TSample]):
    """A single trace, containing multiple sensors.

    Type Parameters:
        - `Sample`: sample data type which this `Sensor` returns. As a
            convention, we suggest returning "batched" data by default, i.e.
            with a leading singleton axis.

    Args:
        sensors: sensors which make up this trace.
        sync: synchronization protocol used to create global samples from
            asynchronous time series. If `Mapping`; the provided indices are
            used directly; if `None`, sensors are expected to already be
            synchronous (equivalent to passing `{k: np.arange(N), ...}`).
        name: friendly name; should only be used for debugging and inspection.
    """

    @staticmethod
    def find_sensors(path: str, virtual: bool = False) -> list[str]:
        """Find all (non-virtual) sensors in a given directory."""
        def is_valid(p: str) -> bool:
            return (
                os.path.isdir(os.path.join(path, p))
                and (virtual or not p.startswith('_'))
                and os.path.exists(os.path.join(path, p, "meta.json")))

        return [p for p in os.listdir(path) if is_valid(p)]

    @classmethod
    def from_config(
        cls, path: str, sync: spec.Synchronization = generic.Empty(),
        sensors: Mapping[
            str, Callable[[str], Sensor] | str | None] | None = None,
        include_virtual: bool = False, name: str | None = None,
    ) -> "Trace":
        """Create a trace from a directory containing a single recording.

        Sensor types can be specified by:

        - `None`: use the [`DynamicSensor`][roverd.sensors.DynamicSensor] type.
        - `"auto"`: return a known sensor type if applicable; see
            [`roverd.sensors`][roverd.sensors].
        - `Callable[[str], Sensor]`: a sensor constructor, which has all
            non-path arguments closed on.
        - `Sensor`: an already initialized sensor instance.

        !!! info

            Sensors can also be inferred automatically (`sensors: None`), in
            which case we ind and load all sensors in the directory, excluding
            virtual sensors (those starting with `_`) unless
            `include_virtual=True`. Each sensor is then initialized as a
            `DynamicSensor`.

        Args:
            path: path to trace directory.
            sync: synchronization protocol.
            sensors: sensor types to use.
            include_virtual: if `True`, include virtual sensors as well.
            name: friendly name; if not provided, defaults to the given `path`.
        """
        if sensors is None:
            _sensors = Trace.find_sensors(path, virtual=include_virtual)
            sensors = {k: None for k in _sensors}

        initialized = {
            k: from_config(
                os.path.join(path, k), type=(k if v == "auto" else v))
            for k, v in sensors.items()
        }

        # Ignore this type error here until abstract-dataloader switches to
        # `Mapping`.
        return cls(
            sensors=initialized, sync=sync,  # type: ignore
            name=path if name is None else name)

    @cached_property
    def filesize(self):
        """Total filesize, in bytes.

        !!! warning

            The trace must be initialized with all sensors for this
            calculation to be correct.
        """
        return sum(getattr(s, 'filesize', 0) for s in self.sensors.values())

    @cached_property
    def datarate(self):
        """Total data rate, in bytes/sec.

        !!! warning

            The trace must be initialized with all sensors for this
            calculation to be correct.
        """
        return sum(getattr(s, 'datarate', 0) for s in self.sensors.values())

    @overload
    def __getitem__(self, index: str) -> Sensor: ...

    @overload
    def __getitem__(self, index: int | np.integer) -> TSample: ...

    def __getitem__(
        self, index: int | np.integer | str
    ) -> TSample | spec.Sensor:
        """Get sample from sychronized index (or fetch a sensor by name).

        !!! tip

            For convenience, traces can be indexed by a `str` sensor name,
            returning that [`Sensor`][abstract_dataloader.spec.].

        Args:
            index: sample index, or sensor name.

        Returns:
            Loaded sample if `index` is an integer type, or the appropriate
                [`Sensor`][abstract_dataloader.spec.] if `index` is a `str`.
        """
        # We just want to overwrite the docstring.
        return super().__getitem__(index)

    def __len__(self) -> int:
        """Total number of sensor-tuple samples."""
        return super().__len__()

datarate cached property

datarate

Total data rate, in bytes/sec.

Warning

The trace must be initialized with all sensors for this calculation to be correct.

filesize cached property

filesize

Total filesize, in bytes.

Warning

The trace must be initialized with all sensors for this calculation to be correct.

__getitem__

__getitem__(index: str) -> Sensor
__getitem__(index: int | integer) -> TSample
__getitem__(index: int | integer | str) -> TSample | Sensor

Get sample from sychronized index (or fetch a sensor by name).

Tip

For convenience, traces can be indexed by a str sensor name, returning that Sensor.

Parameters:

Name Type Description Default
index int | integer | str

sample index, or sensor name.

required

Returns:

Type Description
TSample | Sensor

Loaded sample if index is an integer type, or the appropriate Sensor if index is a str.

Source code in format/src/roverd/trace.py
def __getitem__(
    self, index: int | np.integer | str
) -> TSample | spec.Sensor:
    """Get sample from sychronized index (or fetch a sensor by name).

    !!! tip

        For convenience, traces can be indexed by a `str` sensor name,
        returning that [`Sensor`][abstract_dataloader.spec.].

    Args:
        index: sample index, or sensor name.

    Returns:
        Loaded sample if `index` is an integer type, or the appropriate
            [`Sensor`][abstract_dataloader.spec.] if `index` is a `str`.
    """
    # We just want to overwrite the docstring.
    return super().__getitem__(index)

__len__

__len__() -> int

Total number of sensor-tuple samples.

Source code in format/src/roverd/trace.py
def __len__(self) -> int:
    """Total number of sensor-tuple samples."""
    return super().__len__()

find_sensors staticmethod

find_sensors(path: str, virtual: bool = False) -> list[str]

Find all (non-virtual) sensors in a given directory.

Source code in format/src/roverd/trace.py
@staticmethod
def find_sensors(path: str, virtual: bool = False) -> list[str]:
    """Find all (non-virtual) sensors in a given directory."""
    def is_valid(p: str) -> bool:
        return (
            os.path.isdir(os.path.join(path, p))
            and (virtual or not p.startswith('_'))
            and os.path.exists(os.path.join(path, p, "meta.json")))

    return [p for p in os.listdir(path) if is_valid(p)]

from_config classmethod

from_config(
    path: str,
    sync: Synchronization = Empty(),
    sensors: Mapping[str, Callable[[str], Sensor] | str | None] | None = None,
    include_virtual: bool = False,
    name: str | None = None,
) -> Trace

Create a trace from a directory containing a single recording.

Sensor types can be specified by:

  • None: use the DynamicSensor type.
  • "auto": return a known sensor type if applicable; see roverd.sensors.
  • Callable[[str], Sensor]: a sensor constructor, which has all non-path arguments closed on.
  • Sensor: an already initialized sensor instance.

Info

Sensors can also be inferred automatically (sensors: None), in which case we ind and load all sensors in the directory, excluding virtual sensors (those starting with _) unless include_virtual=True. Each sensor is then initialized as a DynamicSensor.

Parameters:

Name Type Description Default
path str

path to trace directory.

required
sync Synchronization

synchronization protocol.

Empty()
sensors Mapping[str, Callable[[str], Sensor] | str | None] | None

sensor types to use.

None
include_virtual bool

if True, include virtual sensors as well.

False
name str | None

friendly name; if not provided, defaults to the given path.

None
Source code in format/src/roverd/trace.py
@classmethod
def from_config(
    cls, path: str, sync: spec.Synchronization = generic.Empty(),
    sensors: Mapping[
        str, Callable[[str], Sensor] | str | None] | None = None,
    include_virtual: bool = False, name: str | None = None,
) -> "Trace":
    """Create a trace from a directory containing a single recording.

    Sensor types can be specified by:

    - `None`: use the [`DynamicSensor`][roverd.sensors.DynamicSensor] type.
    - `"auto"`: return a known sensor type if applicable; see
        [`roverd.sensors`][roverd.sensors].
    - `Callable[[str], Sensor]`: a sensor constructor, which has all
        non-path arguments closed on.
    - `Sensor`: an already initialized sensor instance.

    !!! info

        Sensors can also be inferred automatically (`sensors: None`), in
        which case we ind and load all sensors in the directory, excluding
        virtual sensors (those starting with `_`) unless
        `include_virtual=True`. Each sensor is then initialized as a
        `DynamicSensor`.

    Args:
        path: path to trace directory.
        sync: synchronization protocol.
        sensors: sensor types to use.
        include_virtual: if `True`, include virtual sensors as well.
        name: friendly name; if not provided, defaults to the given `path`.
    """
    if sensors is None:
        _sensors = Trace.find_sensors(path, virtual=include_virtual)
        sensors = {k: None for k in _sensors}

    initialized = {
        k: from_config(
            os.path.join(path, k), type=(k if v == "auto" else v))
        for k, v in sensors.items()
    }

    # Ignore this type error here until abstract-dataloader switches to
    # `Mapping`.
    return cls(
        sensors=initialized, sync=sync,  # type: ignore
        name=path if name is None else name)

roverd.split

split(
    dataset: Trace[TSample], start: float = 0.0, end: float = 0.0
) -> Trace[TSample]
split(
    dataset: Dataset[TSample], start: float = 0.0, end: float = 0.0
) -> Dataset[TSample]
split(
    dataset: Dataset[TSample] | Trace[TSample],
    start: float = 0.0,
    end: float = 0.0,
) -> Dataset[TSample] | Trace[TSample]

Get sub-trace or sub-dataset.

Parameters:

Name Type Description Default
dataset Dataset[TSample] | Trace[TSample]

trace or dataset to split.

required
start float

start of the split, as a proportion of the trace length (0-1).

0.0
end float

end of the split, as a proportion of the trace length (0-1).

0.0

Returns:

Type Description
Dataset[TSample] | Trace[TSample]

Trace or dataset with a contiguous subset of samples according to the start and end indices.

Source code in format/src/roverd/trace.py
def split(
    dataset: Dataset[TSample] | Trace[TSample],
    start: float = 0.0, end: float = 0.0
) -> Dataset[TSample] | Trace[TSample]:
    """Get sub-trace or sub-dataset.

    Args:
        dataset: trace or dataset to split.
        start: start of the split, as a proportion of the trace length (`0-1`).
        end: end of the split, as a proportion of the trace length (`0-1`).

    Returns:
        Trace or dataset with a contiguous subset of samples according to the
            start and end indices.
    """
    if not (0 <= start < end <= 1):
        raise ValueError(
            f"Invalid split range: {start} - {end} (must be in [0, 1])")

    if isinstance(dataset, Trace):
        # Make dummy indices if `None`.
        if dataset.indices is None:
            indices = {
                k: np.arange(len(v), dtype=np.int32)
                for k, v in dataset.sensors.items()}
        else:
            indices = dataset.indices

        for v in indices.values():
            istart = int(len(v) * start)
            iend = int(len(v) * end)
            break
        else:
            raise ValueError("There must be at least one sensor.")

        return Trace(
            sensors=dataset.sensors,
            sync={k: np.copy(v[istart:iend]) for k, v in indices.items()})

    else:  # Dataset
        return Dataset(traces=[
            split(t, start=start, end=end) for t in dataset.traces])