Skip to content

roverd.sensors

Rover Sensors.

Name Class Description Timestamp Correction
radar XWRRadar 4D radar IID
lidar OSLidarDepth Ouster OS0/1/2 lidar depth data IID + frame drops
OSLidar Ouster lidar with reflectance and NIR
camera Camera Generic RGB camera IID
_camera Semseg Generic image semantic segmentation IID
imu IMU 3-axis accelerometer + gyroscope IID
(other) DynamicSensor Generic sensor with dynamic channels none

roverd.sensors.Camera

Bases: Sensor[CameraData[ndarray], Metadata]

Generic RGB camera.

Parameters:

Name Type Description Default
path str

path to sensor data directory. Must contain a lidar.json file with ouster lidar intrinsics.

required
key str

video channel name.

'video.avi'
correction str | None | Callable[[Float64[ndarray, N]], Float64[ndarray, N]]

optional timestamp correction to apply (i.e., smoothing); can be a callable, string (name of a callable in roverd.timestamps), or None. If "auto", uses smooth(interval=30.).

None
past int

number of past samples to include.

0
future int

number of future samples to include.

0
Source code in format/src/roverd/sensors/camera.py
class Camera(Sensor[types.CameraData[np.ndarray], generic.Metadata]):
    """Generic RGB camera.

    Args:
        path: path to sensor data directory. Must contain a `lidar.json` file
            with ouster lidar intrinsics.
        key: video channel name.
        correction: optional timestamp correction to apply (i.e.,
            smoothing); can be a callable, string (name of a callable in
            [`roverd.timestamps`][roverd.timestamps]), or `None`. If `"auto"`,
            uses `smooth(interval=30.)`.
        past: number of past samples to include.
        future: number of future samples to include.
    """

    def __init__(
        self, path: str, key: str = "video.avi",
        correction: str | None | Callable[
            [Float64[np.ndarray, "N"]], Float64[np.ndarray, "N"]] = None,
        past: int = 0, future: int = 0
    ) -> None:
        if correction == "auto":
            correction = partial(timestamps.smooth, interval=30.)

        super().__init__(path, correction=correction, past=past, future=future)
        self.metadata = generic.Metadata(
            timestamps=self.correction(
                self.channels["ts"].read(start=0, samples=-1)))
        self.key = key

    @overload
    def __getitem__(self, index: int | np.integer) -> types.CameraData: ...

    @overload
    def __getitem__(self, index: str) -> channels.Channel: ...

    def __getitem__(
        self, index: int | np.integer | str
    ) -> types.CameraData[np.ndarray] | channels.Channel:
        """Read camera data by index.

        Args:
            index: frame index, or channel name.

        Returns:
            Radar data, or channel object if `index` is a string.
        """
        if isinstance(index, str):
            return self.channels[index]
        else: # int | np.integer
            return types.CameraData(
                image=self.channels[self.key].read(
                    index - self.past, samples=self.window)[None],
                timestamps=self.metadata.timestamps[
                    index - self.past:index + self.future + 1][None])

__getitem__

__getitem__(index: int | integer) -> CameraData
__getitem__(index: str) -> Channel
__getitem__(index: int | integer | str) -> CameraData[ndarray] | Channel

Read camera data by index.

Parameters:

Name Type Description Default
index int | integer | str

frame index, or channel name.

required

Returns:

Type Description
CameraData[ndarray] | Channel

Radar data, or channel object if index is a string.

Source code in format/src/roverd/sensors/camera.py
def __getitem__(
    self, index: int | np.integer | str
) -> types.CameraData[np.ndarray] | channels.Channel:
    """Read camera data by index.

    Args:
        index: frame index, or channel name.

    Returns:
        Radar data, or channel object if `index` is a string.
    """
    if isinstance(index, str):
        return self.channels[index]
    else: # int | np.integer
        return types.CameraData(
            image=self.channels[self.key].read(
                index - self.past, samples=self.window)[None],
            timestamps=self.metadata.timestamps[
                index - self.past:index + self.future + 1][None])

roverd.sensors.DynamicSensor

Bases: Sensor[TGenericSample, Metadata]

Generic sensor type with dynamically configured data types.

Parameters:

Name Type Description Default
path str

path to sensor data directory. Must contain a meta.json file; see the dataset format specifications.

required
create bool

if True, create a new sensor at the specified path if one is not already present.

False
exist_ok bool

if True, do not raise an error if create=True and the sensor already exists.

False
subset Sequence[str] | None

if specified, only read the listed channels.

None
correction str | None | Callable[[Float64[ndarray, N]], Float64[ndarray, N]]

optional timestamp correction to apply (i.e., smoothing); can be a callable, string (name of a callable in roverd.timestamps), or None.

None
past int

number of past samples to include.

0
future int

number of future samples to include.

0
Source code in format/src/roverd/sensors/generic.py
class DynamicSensor(Sensor[TGenericSample, generic.Metadata]):
    """Generic sensor type with dynamically configured data types.

    Args:
        path: path to sensor data directory. Must contain a `meta.json` file;
            see the dataset format specifications.
        create: if `True`, create a new sensor at the specified path if one is
            not already present.
        exist_ok: if `True`, do not raise an error if `create=True` and the
            sensor already exists.
        subset: if specified, only read the listed channels.
        correction: optional timestamp correction to apply (i.e.,
            smoothing); can be a callable, string (name of a callable in
            [`roverd.timestamps`][roverd.timestamps]), or `None`.
        past: number of past samples to include.
        future: number of future samples to include.
    """

    def __init__(
        self, path: str, create: bool = False, exist_ok: bool = False,
        subset: Sequence[str] | None = None,
        correction: str | None | Callable[
            [Float64[np.ndarray, "N"]], Float64[np.ndarray, "N"]] = None,
        past: int = 0, future: int = 0
    ) -> None:
        if create:
            if os.path.exists(path):
                if not exist_ok:
                    raise ValueError(
                        "`create=True`, but this sensor already exists!")
            else:
                os.makedirs(path)
                with open(os.path.join(path, "meta.json"), 'w') as f:
                    json.dump({}, f)

        super().__init__(
            path=path, correction=correction, past=past, future=future)
        self.subset = subset

    @cached_property
    def metadata(self) -> generic.Metadata:  # type: ignore
        if 'ts' not in self.channels:
            warnings.warn(
                f"Sensor metadata does not contain 'ts' channel: {self.path}.")
            return generic.Metadata(timestamps=np.array([], dtype=np.float64))

        ts = self.channels['ts'].read(start=0, samples=-1)
        corrected = self.correction(ts)
        return generic.Metadata(timestamps=corrected)

    def _flush_config(self) -> None:
        """Flush configuration to disk.

        !!! danger

            This is an inherently dangerous operation; call with extreme
            caution!
        """
        with open(os.path.join(self.path, "meta.json"), 'w') as f:
            json.dump(self.config, f, indent=4)

    def create(
        self, channel: str, meta: dict, args: dict = {}
    ) -> channels.Channel:
        """Create and open new channel.

        Args:
            channel: name of new channel.
            meta: metadata for the new channel; see
                [`channels.from_config`][roverd.].
            args: additional arguments to pass to the channel constructor.

        Returns:
            The newly created channel; sensor metadata is also flushed to disk,
                and the channel is registered with this `SensorData`.
        """
        self.config[channel] = meta
        self.channels[channel] = channels.from_config(
            path=os.path.join(self.path, channel), args=args, **meta)
        self._flush_config()
        return self.channels[channel]

    @overload
    def __getitem__(self, index: int | np.integer) -> TGenericSample: ...

    @overload
    def __getitem__(self, index: str) -> channels.Channel: ...

    def __getitem__(
        self, index: int | np.integer | str
    ) -> TGenericSample | channels.Channel:
        """Fetch measurement from this sensor, by index.

        Args:
            index: measurement index, or channel name.

        Returns:
            Measurement data, or channel object if `index` is a string.
        """
        if isinstance(index, str):
            return self.channels[index]
        else:  # int | np.integer
            if self.subset is not None:
                data = {
                    k: self.channels[k].read(
                        index - self.past, samples=self.window)
                    for k in self.subset}
            else:
                data = {
                    k: v.read(index - self.past, samples=self.window)
                    for k, v in self.channels.items()}
            return cast(TGenericSample, data)

__getitem__

__getitem__(index: int | integer) -> TGenericSample
__getitem__(index: str) -> Channel
__getitem__(index: int | integer | str) -> TGenericSample | Channel

Fetch measurement from this sensor, by index.

Parameters:

Name Type Description Default
index int | integer | str

measurement index, or channel name.

required

Returns:

Type Description
TGenericSample | Channel

Measurement data, or channel object if index is a string.

Source code in format/src/roverd/sensors/generic.py
def __getitem__(
    self, index: int | np.integer | str
) -> TGenericSample | channels.Channel:
    """Fetch measurement from this sensor, by index.

    Args:
        index: measurement index, or channel name.

    Returns:
        Measurement data, or channel object if `index` is a string.
    """
    if isinstance(index, str):
        return self.channels[index]
    else:  # int | np.integer
        if self.subset is not None:
            data = {
                k: self.channels[k].read(
                    index - self.past, samples=self.window)
                for k in self.subset}
        else:
            data = {
                k: v.read(index - self.past, samples=self.window)
                for k, v in self.channels.items()}
        return cast(TGenericSample, data)

create

create(channel: str, meta: dict, args: dict = {}) -> Channel

Create and open new channel.

Parameters:

Name Type Description Default
channel str

name of new channel.

required
meta dict

metadata for the new channel; see channels.from_config.

required
args dict

additional arguments to pass to the channel constructor.

{}

Returns:

Type Description
Channel

The newly created channel; sensor metadata is also flushed to disk, and the channel is registered with this SensorData.

Source code in format/src/roverd/sensors/generic.py
def create(
    self, channel: str, meta: dict, args: dict = {}
) -> channels.Channel:
    """Create and open new channel.

    Args:
        channel: name of new channel.
        meta: metadata for the new channel; see
            [`channels.from_config`][roverd.].
        args: additional arguments to pass to the channel constructor.

    Returns:
        The newly created channel; sensor metadata is also flushed to disk,
            and the channel is registered with this `SensorData`.
    """
    self.config[channel] = meta
    self.channels[channel] = channels.from_config(
        path=os.path.join(self.path, channel), args=args, **meta)
    self._flush_config()
    return self.channels[channel]

roverd.sensors.IMU

Bases: Sensor[IMUData[ndarray], Metadata]

IMU sensor.

Parameters:

Name Type Description Default
path str

path to sensor data directory. Must contain a lidar.json file with ouster lidar intrinsics.

required
correction str | None | Callable[[Float64[ndarray, N]], Float64[ndarray, N]]

optional timestamp correction to apply (i.e., smoothing); can be a callable, string (name of a callable in roverd.timestamps), or None. If "auto", uses smooth(interval=30.).

None
past int

number of past samples to include.

0
future int

number of future samples to include.

0
Source code in format/src/roverd/sensors/pose.py
class IMU(Sensor[types.IMUData[np.ndarray], Metadata]):
    """IMU sensor.

    Args:
        path: path to sensor data directory. Must contain a `lidar.json` file
            with ouster lidar intrinsics.
        correction: optional timestamp correction to apply (i.e.,
            smoothing); can be a callable, string (name of a callable in
            [`roverd.timestamps`][roverd.timestamps]), or `None`. If `"auto"`,
            uses `smooth(interval=30.)`.
        past: number of past samples to include.
        future: number of future samples to include.
    """

    def __init__(
        self, path: str, correction: str | None | Callable[
            [Float64[np.ndarray, "N"]], Float64[np.ndarray, "N"]] = None,
        past: int = 0, future: int = 0
    ) -> None:
        if correction == "auto":
            correction = partial(timestamps.smooth, interval=30.)

        super().__init__(path, correction=correction, past=past, future=future)

        # Manual handling: on traces where we get a power cut, it's possible
        # that the entries are not the same length.
        ts = self.correction(self.channels["ts"].read(start=0, samples=-1))
        acc = self.channels["acc"].read(start=0, samples=-1)
        rot = self.channels["rot"].read(start=0, samples=-1)
        avel = self.channels["avel"].read(start=0, samples=-1)
        n = min(len(ts), len(acc), len(rot), len(avel))

        self.metadata = Metadata(ts[:n])
        self.imudata = types.IMUData(
            acc=acc[:n, None, ...], rot=rot[:n, None, ...],
            avel=avel[:n, None, ...], timestamps=ts[:n, None])

    @overload
    def __getitem__(
        self, index: int | np.integer) -> types.IMUData[np.ndarray]: ...

    @overload
    def __getitem__(self, index: str) -> channels.Channel: ...

    def __getitem__(
        self, index: int | np.integer | str
    ) -> types.IMUData[np.ndarray] | channels.Channel:
        """Fetch IMU data by index.

        Args:
            index: frame index, or channel name.

        Returns:
            Radar data, or channel object if `index` is a string.
        """
        if isinstance(index, str):
            return self.channels[index]
        else: # int | np.integer
            return types.IMUData(
                acc=self.imudata.acc[
                    index - self.past:index + self.future + 1, 0][None],
                rot=self.imudata.rot[
                    index - self.past:index + self.future + 1, 0][None],
                avel=self.imudata.avel[
                    index - self.past:index + self.future + 1, 0][None],
                timestamps=self.imudata.timestamps[
                    index - self.past:index + self.future + 1, 0][None])

__getitem__

__getitem__(index: int | integer) -> IMUData[ndarray]
__getitem__(index: str) -> Channel
__getitem__(index: int | integer | str) -> IMUData[ndarray] | Channel

Fetch IMU data by index.

Parameters:

Name Type Description Default
index int | integer | str

frame index, or channel name.

required

Returns:

Type Description
IMUData[ndarray] | Channel

Radar data, or channel object if index is a string.

Source code in format/src/roverd/sensors/pose.py
def __getitem__(
    self, index: int | np.integer | str
) -> types.IMUData[np.ndarray] | channels.Channel:
    """Fetch IMU data by index.

    Args:
        index: frame index, or channel name.

    Returns:
        Radar data, or channel object if `index` is a string.
    """
    if isinstance(index, str):
        return self.channels[index]
    else: # int | np.integer
        return types.IMUData(
            acc=self.imudata.acc[
                index - self.past:index + self.future + 1, 0][None],
            rot=self.imudata.rot[
                index - self.past:index + self.future + 1, 0][None],
            avel=self.imudata.avel[
                index - self.past:index + self.future + 1, 0][None],
            timestamps=self.imudata.timestamps[
                index - self.past:index + self.future + 1, 0][None])

roverd.sensors.LidarMetadata dataclass

Lidar metadata.

Attributes:

Name Type Description
timestamps Float64[ndarray, N]

timestamp for each frame; nominally in seconds.

intrinics Float64[ndarray, N]

lidar intrinsics file; see the ouster sdk SensorInfo documentation.

Source code in format/src/roverd/sensors/lidar.py
@dataclass
class LidarMetadata:
    """Lidar metadata.

    Attributes:
        timestamps: timestamp for each frame; nominally in seconds.
        intrinics: lidar intrinsics file; see the ouster sdk [`SensorInfo`](
            https://static.ouster.dev/sdk-docs/python/api/client.html#ouster.sdk.client.SensorInfo)
            documentation.
    """

    timestamps: Float64[np.ndarray, "N"]
    intrinsics: str

roverd.sensors.OSLidar

Bases: Sensor[OSData[ndarray], LidarMetadata]

Ouster lidar sensor, all channels.

Parameters:

Name Type Description Default
path str

path to sensor data directory. Must contain a lidar.json file with ouster lidar intrinsics.

required
correction str | None | Callable[[Float64[ndarray, N]], Float64[ndarray, N]]

optional timestamp correction to apply (i.e., smoothing); can be a callable, string (name of a callable in roverd.timestamps), or None. If "auto", uses discretize(interval=10., eps=0.05).

None
past int

number of past samples to include.

0
future int

number of future samples to include.

0
Source code in format/src/roverd/sensors/lidar.py
class OSLidar(Sensor[types.OSData[np.ndarray], LidarMetadata]):
    """Ouster lidar sensor, all channels.

    Args:
        path: path to sensor data directory. Must contain a `lidar.json` file
            with ouster lidar intrinsics.
        correction: optional timestamp correction to apply (i.e.,
            smoothing); can be a callable, string (name of a callable in
            [`roverd.timestamps`][roverd.timestamps]), or `None`. If `"auto"`,
            uses `discretize(interval=10., eps=0.05)`.
        past: number of past samples to include.
        future: number of future samples to include.
    """

    def __init__(
        self, path: str, correction: str | None | Callable[
            [Float64[np.ndarray, "N"]], Float64[np.ndarray, "N"]] = None,
        past: int = 0, future: int = 0
    ) -> None:
        if correction == "auto":
            correction = partial(timestamps.discretize, interval=10., eps=0.05)

        super().__init__(path, correction=correction, past=past, future=future)

        if not os.path.exists(os.path.join(path, 'lidar.json')):
            warnings.warn(
                f"No 'lidar.json' found in {path}; using '' as a placeholder.")
            intrinsics = ""
        else:
            intrinsics = os.path.join(path, "lidar.json")

        self.metadata = LidarMetadata(
            timestamps=self.correction(
                self.channels['ts'].read(start=0, samples=-1)),
            intrinsics=intrinsics)

    @overload
    def __getitem__(self, index: int | np.integer) -> types.OSData: ...

    @overload
    def __getitem__(self, index: str) -> channels.Channel: ...

    def __getitem__(
        self, index: int | np.integer | str
    ) -> types.OSData[np.ndarray] | channels.Channel:
        """Read lidar data by index.

        Args:
            index: frame index, or channel name.

        Returns:
            Radar data, or channel object if `index` is a string.
        """
        if isinstance(index, str):
            return self.channels[index]
        else: # int | np.integer
            return types.OSData(
                rng=self.channels['rng'].read(
                    index - self.past, samples=self.window)[None],
                rfl=self.channels['rfl'].read(
                    index - self.past, samples=self.window)[None],
                nir=self.channels['nir'].read(
                    index - self.past, samples=self.window)[None],
                timestamps=self.metadata.timestamps[
                    index - self.past:index + self.future + 1][None],
                intrinsics=self.metadata.intrinsics)

__getitem__

__getitem__(index: int | integer) -> OSData
__getitem__(index: str) -> Channel
__getitem__(index: int | integer | str) -> OSData[ndarray] | Channel

Read lidar data by index.

Parameters:

Name Type Description Default
index int | integer | str

frame index, or channel name.

required

Returns:

Type Description
OSData[ndarray] | Channel

Radar data, or channel object if index is a string.

Source code in format/src/roverd/sensors/lidar.py
def __getitem__(
    self, index: int | np.integer | str
) -> types.OSData[np.ndarray] | channels.Channel:
    """Read lidar data by index.

    Args:
        index: frame index, or channel name.

    Returns:
        Radar data, or channel object if `index` is a string.
    """
    if isinstance(index, str):
        return self.channels[index]
    else: # int | np.integer
        return types.OSData(
            rng=self.channels['rng'].read(
                index - self.past, samples=self.window)[None],
            rfl=self.channels['rfl'].read(
                index - self.past, samples=self.window)[None],
            nir=self.channels['nir'].read(
                index - self.past, samples=self.window)[None],
            timestamps=self.metadata.timestamps[
                index - self.past:index + self.future + 1][None],
            intrinsics=self.metadata.intrinsics)

roverd.sensors.OSLidarDepth

Bases: Sensor[OSDepth[ndarray], LidarMetadata]

Ouster lidar sensor, depth/rng only.

Parameters:

Name Type Description Default
path str

path to sensor data directory. Must contain a lidar.json file with ouster lidar intrinsics.

required
correction str | None | Callable[[Float64[ndarray, N]], Float64[ndarray, N]]

optional timestamp correction to apply (i.e., smoothing); can be a callable, string (name of a callable in roverd.timestamps), or None. If "auto", uses discretize(interval=10., eps=0.05).

None
past int

number of past samples to include.

0
future int

number of future samples to include.

0
Source code in format/src/roverd/sensors/lidar.py
class OSLidarDepth(Sensor[types.OSDepth[np.ndarray], LidarMetadata]):
    """Ouster lidar sensor, depth/rng only.

    Args:
        path: path to sensor data directory. Must contain a `lidar.json` file
            with ouster lidar intrinsics.
        correction: optional timestamp correction to apply (i.e.,
            smoothing); can be a callable, string (name of a callable in
            [`roverd.timestamps`][roverd.timestamps]), or `None`. If `"auto"`,
            uses `discretize(interval=10., eps=0.05)`.
        past: number of past samples to include.
        future: number of future samples to include.
    """

    def __init__(
        self, path: str, correction: str | None | Callable[
            [Float64[np.ndarray, "N"]], Float64[np.ndarray, "N"]] = None,
        past: int = 0, future: int = 0
    ) -> None:
        if correction == "auto":
            correction = partial(timestamps.discretize, interval=10., eps=0.05)

        super().__init__(path, correction=correction, past=past, future=future)

        if not os.path.exists(os.path.join(path, 'lidar.json')):
            warnings.warn(
                f"No 'lidar.json' found in {path}; using '' as a placeholder.")
            intrinsics = ""
        else:
            intrinsics = os.path.join(path, "lidar.json")

        self.metadata = LidarMetadata(
            timestamps=self.correction(
                self.channels['ts'].read(start=0, samples=-1)),
            intrinsics=intrinsics)

    @overload
    def __getitem__(self, index: int | np.integer) -> types.OSDepth: ...

    @overload
    def __getitem__(self, index: str) -> channels.Channel: ...

    def __getitem__(
        self, index: int | np.integer | str
    ) -> types.OSDepth[np.ndarray] | channels.Channel:
        """Read lidar data by index.

        Args:
            index: frame index, or channel name.

        Returns:
            Radar data, or channel object if `index` is a string.
        """
        if isinstance(index, str):
            return self.channels[index]
        else: # int | np.integer
            return types.OSDepth(
                rng=self.channels['rng'].read(
                    index - self.past, samples=self.window)[None],
                timestamps=self.metadata.timestamps[
                    index - self.past:index + self.future + 1][None],
                intrinsics=self.metadata.intrinsics)

    def stream(  # type: ignore
        self, batch: int | None = None
    ) -> Iterator[types.OSDepth[np.ndarray]]:
        """Stream lidar data.

        Args:
            batch: if specified, stream in batches of this size; otherwise,
                stream one frame at a time.

        Yields:
            Lidar data.
        """
        if batch is not None:
            raise NotImplementedError()

        for t, rng in zip(
            self.metadata.timestamps, self.channels['rng'].stream()
        ):
            yield types.OSDepth(
                rng=rng[None, None],
                timestamps=t[None, None],
                intrinsics=self.metadata.intrinsics)

__getitem__

__getitem__(index: int | integer) -> OSDepth
__getitem__(index: str) -> Channel
__getitem__(index: int | integer | str) -> OSDepth[ndarray] | Channel

Read lidar data by index.

Parameters:

Name Type Description Default
index int | integer | str

frame index, or channel name.

required

Returns:

Type Description
OSDepth[ndarray] | Channel

Radar data, or channel object if index is a string.

Source code in format/src/roverd/sensors/lidar.py
def __getitem__(
    self, index: int | np.integer | str
) -> types.OSDepth[np.ndarray] | channels.Channel:
    """Read lidar data by index.

    Args:
        index: frame index, or channel name.

    Returns:
        Radar data, or channel object if `index` is a string.
    """
    if isinstance(index, str):
        return self.channels[index]
    else: # int | np.integer
        return types.OSDepth(
            rng=self.channels['rng'].read(
                index - self.past, samples=self.window)[None],
            timestamps=self.metadata.timestamps[
                index - self.past:index + self.future + 1][None],
            intrinsics=self.metadata.intrinsics)

stream

stream(batch: int | None = None) -> Iterator[OSDepth[ndarray]]

Stream lidar data.

Parameters:

Name Type Description Default
batch int | None

if specified, stream in batches of this size; otherwise, stream one frame at a time.

None

Yields:

Type Description
OSDepth[ndarray]

Lidar data.

Source code in format/src/roverd/sensors/lidar.py
def stream(  # type: ignore
    self, batch: int | None = None
) -> Iterator[types.OSDepth[np.ndarray]]:
    """Stream lidar data.

    Args:
        batch: if specified, stream in batches of this size; otherwise,
            stream one frame at a time.

    Yields:
        Lidar data.
    """
    if batch is not None:
        raise NotImplementedError()

    for t, rng in zip(
        self.metadata.timestamps, self.channels['rng'].stream()
    ):
        yield types.OSDepth(
            rng=rng[None, None],
            timestamps=t[None, None],
            intrinsics=self.metadata.intrinsics)

roverd.sensors.Semseg

Bases: Sensor[CameraSemseg[ndarray], Metadata]

Generic camera semseg.

Parameters:

Name Type Description Default
path str

path to sensor data directory. Must contain a lidar.json file with ouster lidar intrinsics.

required
key str

semseg channel name.

'segment'
correction str | None | Callable[[Float64[ndarray, N]], Float64[ndarray, N]]

optional timestamp correction to apply (i.e., smoothing); can be a callable, string (name of a callable in roverd.timestamps), or None. If "auto", uses smooth(interval=30.).

None
past int

number of past samples to include.

0
future int

number of future samples to include.

0
Source code in format/src/roverd/sensors/camera.py
class Semseg(Sensor[types.CameraSemseg[np.ndarray], generic.Metadata]):
    """Generic camera semseg.

    Args:
        path: path to sensor data directory. Must contain a `lidar.json` file
            with ouster lidar intrinsics.
        key: semseg channel name.
        correction: optional timestamp correction to apply (i.e.,
            smoothing); can be a callable, string (name of a callable in
            [`roverd.timestamps`][roverd.timestamps]), or `None`. If `"auto"`,
            uses `smooth(interval=30.)`.
        past: number of past samples to include.
        future: number of future samples to include.
    """

    def __init__(
        self, path: str, key: str = "segment",
        correction: str | None | Callable[
            [Float64[np.ndarray, "N"]], Float64[np.ndarray, "N"]] = None,
        past: int = 0, future: int = 0
    ) -> None:
        if correction == "auto":
            correction = partial(timestamps.smooth, interval=30.)

        super().__init__(path, correction=correction, past=past, future=future)
        self.metadata = generic.Metadata(
            timestamps=self.correction(
                self.channels["ts"].read(start=0, samples=-1)))
        self.key = key

    @overload
    def __getitem__(self, index: int | np.integer) -> types.CameraSemseg: ...

    @overload
    def __getitem__(self, index: str) -> channels.Channel: ...

    def __getitem__(
        self, index: int | np.integer | str
    ) -> types.CameraSemseg[np.ndarray] | channels.Channel:
        """Read camera data by index.

        Args:
            index: frame index, or channel name.

        Returns:
            Radar data, or channel object if `index` is a string.
        """
        if isinstance(index, str):
            return self.channels[index]
        else: # int | np.integer
            return types.CameraSemseg(
                semseg=self.channels[self.key].read(
                    index - self.past, samples=self.window)[None],
                timestamps=self.metadata.timestamps[
                    index - self.past: index + self.future + 1][None])

__getitem__

__getitem__(index: int | integer) -> CameraSemseg
__getitem__(index: str) -> Channel
__getitem__(index: int | integer | str) -> CameraSemseg[ndarray] | Channel

Read camera data by index.

Parameters:

Name Type Description Default
index int | integer | str

frame index, or channel name.

required

Returns:

Type Description
CameraSemseg[ndarray] | Channel

Radar data, or channel object if index is a string.

Source code in format/src/roverd/sensors/camera.py
def __getitem__(
    self, index: int | np.integer | str
) -> types.CameraSemseg[np.ndarray] | channels.Channel:
    """Read camera data by index.

    Args:
        index: frame index, or channel name.

    Returns:
        Radar data, or channel object if `index` is a string.
    """
    if isinstance(index, str):
        return self.channels[index]
    else: # int | np.integer
        return types.CameraSemseg(
            semseg=self.channels[self.key].read(
                index - self.past, samples=self.window)[None],
            timestamps=self.metadata.timestamps[
                index - self.past: index + self.future + 1][None])

roverd.sensors.Sensor

Bases: Sensor[TSample, TMetadata]

Base sensor class, providing various utility methods.

Warning

If past > 0 or future > 0, the caller is responsible for ensuring that invalid indices < past or >= len(sensor) - future are not read.

Parameters:

Name Type Description Default
path str

path to sensor data directory. Must contain a meta.json file; see the dataset format specifications.

required
correction str | None | Callable[[Float64[ndarray, N]], Float64[ndarray, N]]

optional timestamp correction to apply (i.e., smoothing); can be a callable, string (name of a callable in roverd.timestamps), or None.

None
past int

number of past samples to include.

0
future int

number of future samples to include.

0

Attributes:

Name Type Description
path

path to sensor data directory.

channels

dictionary of channels, keyed by channel name.

correction

timestamp correction function to apply.

Source code in format/src/roverd/sensors/generic.py
class Sensor(abstract.Sensor[TSample, TMetadata]):
    """Base sensor class, providing various utility methods.

    !!! warning

        If `past > 0` or `future > 0`, the caller is responsible for
        ensuring that invalid indices `< past` or `>= len(sensor) - future`
        are not read.

    Args:
        path: path to sensor data directory. Must contain a `meta.json` file;
            see the dataset format specifications.
        correction: optional timestamp correction to apply (i.e.,
            smoothing); can be a callable, string (name of a callable in
            [`roverd.timestamps`][roverd.timestamps]), or `None`.
        past: number of past samples to include.
        future: number of future samples to include.

    Attributes:
        path: path to sensor data directory.
        channels: dictionary of channels, keyed by channel name.
        correction: timestamp correction function to apply.
    """

    def __init__(
        self, path: str,
        correction: str | None | Callable[
            [Float64[np.ndarray, "N"]], Float64[np.ndarray, "N"]] = None,
        past: int = 0, future: int = 0
    ) -> None:

        self.path = path
        self.past = past
        self.future = future
        self.window = past + future + 1

        try:
            with open(os.path.join(path, "meta.json")) as f:
                self.config = json.load(f)
        except (FileNotFoundError, json.JSONDecodeError) as e:
            raise ValueError(
                "{}: no valid 'metadata.json' found.".format(str(e)))

        self.channels = {
            name: channels.from_config(
                path=os.path.join(self.path, name), **cfg)
            for name, cfg in self.config.items()}

        if correction is None:
            self.correction = timestamps.identity
        elif isinstance(correction, str):
            correction = getattr(timestamps, correction, None)
            if correction is None:
                raise ValueError(
                    f"Unknown timestamp correction function: {correction}")
            self.correction = cast(
                Callable[[Float64[np.ndarray, "N"]], Float64[np.ndarray, "N"]],
                correction)
        else:
            self.correction = correction

    @cached_property
    def filesize(self):
        """Total filesize, in bytes."""
        return sum(c.filesize for _, c in self.channels.items())

    @cached_property
    def datarate(self):
        """Total data rate, in bytes/sec."""
        return self.filesize / self.duration

    def __len__(self) -> int:
        """Total number of measurements."""
        return os.stat(os.path.join(self.path, "ts")).st_size // 8

    def __repr__(self) -> str:  # noqa: D105
        return "{}({}: [{}])".format(
            self.__class__.__name__, self.path, ", ".join(self.channels))

datarate cached property

datarate

Total data rate, in bytes/sec.

filesize cached property

filesize

Total filesize, in bytes.

__len__

__len__() -> int

Total number of measurements.

Source code in format/src/roverd/sensors/generic.py
def __len__(self) -> int:
    """Total number of measurements."""
    return os.stat(os.path.join(self.path, "ts")).st_size // 8

roverd.sensors.XWRRadar

Bases: Sensor[XWRRadarIQ[ndarray], RadarMetadata]

Full spectrum 4D radar sensor.

Parameters:

Name Type Description Default
path str

path to sensor data directory. Must contain a radar.json file with range_resolution and doppler_resolution keys.

required
correction str | None | Callable[[Float64[ndarray, N]], Float64[ndarray, N]]

optional timestamp correction to apply (i.e., smoothing); can be a callable, string (name of a callable in roverd.timestamps), or None. If "auto", uses smooth(interval=30.).

None
past int

number of past samples to include.

0
future int

number of future samples to include.

0
Source code in format/src/roverd/sensors/radar.py
class XWRRadar(Sensor[types.XWRRadarIQ[np.ndarray], RadarMetadata]):
    """Full spectrum 4D radar sensor.

    Args:
        path: path to sensor data directory. Must contain a `radar.json`
            file with `range_resolution` and `doppler_resolution` keys.
        correction: optional timestamp correction to apply (i.e.,
            smoothing); can be a callable, string (name of a callable in
            [`roverd.timestamps`][roverd.timestamps]), or `None`. If `"auto"`,
            uses `smooth(interval=30.)`.
        past: number of past samples to include.
        future: number of future samples to include.
    """

    def __init__(
        self, path: str, correction: str | None | Callable[
            [Float64[np.ndarray, "N"]], Float64[np.ndarray, "N"]] = None,
        past: int = 0, future: int = 0
    ) -> None:
        if correction == "auto":
            correction = partial(timestamps.smooth, interval=30.)

        super().__init__(path, correction=correction, past=past, future=future)

        try:
            with open(os.path.join(path, "radar.json")) as f:
                radar_cfg = json.load(f)
                dr = radar_cfg["range_resolution"]
                dd = radar_cfg["doppler_resolution"]
        except KeyError as e:
            raise KeyError(
                f"{os.path.join(path, 'radar.json')} is missing a required "
                f"key: {str(e)}") from e
        except FileNotFoundError:
            warnings.warn(
                "No `radar.json` found; setting `dr=0` and `dd=0`. "
                "This may cause problems for radar processing later!")
            dr, dd = 0.0, 0.0

        self.metadata = RadarMetadata(
            doppler_resolution=np.array([dd], dtype=np.float32),
            range_resolution=np.array([dr], dtype=np.float32),
            timestamps=self.correction(
                self.channels['ts'].read(start=0, samples=-1)))

    @overload
    def __getitem__(
        self, index: int | np.integer) -> types.XWRRadarIQ[np.ndarray]: ...

    @overload
    def __getitem__(self, index: str) -> channels.Channel: ...

    def __getitem__(
        self, index: int | np.integer | str
    ) -> types.XWRRadarIQ[np.ndarray] | channels.Channel:
        """Fetch IQ data by index.

        Args:
            index: frame index, or channel name.

        Returns:
            Radar data, or channel object if `index` is a string.
        """
        if isinstance(index, str):
            return self.channels[index]
        else: # int | np.integer
            return types.XWRRadarIQ(
                iq=self.channels['iq'].read(
                    index - self.past, samples=self.window)[None],
                timestamps=self.metadata.timestamps[
                    index - self.past:index + self.future + 1][None],
                range_resolution=self.metadata.range_resolution,
                doppler_resolution=self.metadata.doppler_resolution,
                valid=self.channels['valid'].read(
                    index - self.past, samples=self.window)[None])

__getitem__

__getitem__(index: int | integer) -> XWRRadarIQ[ndarray]
__getitem__(index: str) -> Channel
__getitem__(index: int | integer | str) -> XWRRadarIQ[ndarray] | Channel

Fetch IQ data by index.

Parameters:

Name Type Description Default
index int | integer | str

frame index, or channel name.

required

Returns:

Type Description
XWRRadarIQ[ndarray] | Channel

Radar data, or channel object if index is a string.

Source code in format/src/roverd/sensors/radar.py
def __getitem__(
    self, index: int | np.integer | str
) -> types.XWRRadarIQ[np.ndarray] | channels.Channel:
    """Fetch IQ data by index.

    Args:
        index: frame index, or channel name.

    Returns:
        Radar data, or channel object if `index` is a string.
    """
    if isinstance(index, str):
        return self.channels[index]
    else: # int | np.integer
        return types.XWRRadarIQ(
            iq=self.channels['iq'].read(
                index - self.past, samples=self.window)[None],
            timestamps=self.metadata.timestamps[
                index - self.past:index + self.future + 1][None],
            range_resolution=self.metadata.range_resolution,
            doppler_resolution=self.metadata.doppler_resolution,
            valid=self.channels['valid'].read(
                index - self.past, samples=self.window)[None])

roverd.sensors.from_config

from_config(
    path: str, type: str | None | Sensor | Callable[[str], Sensor]
) -> Sensor

Create sensor from configuration.

Parameters:

Name Type Description Default
path str

File path to the sensor data.

required
type str | None | Sensor | Callable[[str], Sensor]

sensor, sensor constructor, name of a sensor, or None (in which case we use DynamicSensor).

required

Returns:

Type Description
Sensor

Initialized sensor object.

Source code in format/src/roverd/sensors/__init__.py
def from_config(
    path: str, type: str | None | Sensor | Callable[[str], Sensor]
) -> Sensor:
    """Create sensor from configuration.

    Args:
        path: File path to the sensor data.
        type: sensor, sensor constructor, name of a sensor, or `None` (in which
            case we use [`DynamicSensor`][roverd.sensors.DynamicSensor]).

    Returns:
        Initialized sensor object.
    """
    if isinstance(type, Sensor):
        return type
    elif type is None:
        return DynamicSensor(path)
    elif isinstance(type, str):
        if type not in SENSOR_TYPES:
            raise ValueError(f"Unknown sensor type: {type}")
        return SENSOR_TYPES[type](path)
    else:
        return type(path)