Skip to content

Data Transforms

Data transformations and compatibility layers.

roverd.transforms.ouster

Ouster Lidar transforms.

Warning

This module is not automatically imported; you will need to explicitly import it:

from roverd.transforms import ouster

You will also need to have the ouster-sdk extra installed.

roverd.transforms.ouster.ConfigCache

Ouster sensor intrinsics cache.

Source code in format/src/roverd/transforms/ouster.py
class ConfigCache:
    """Ouster sensor intrinsics cache."""

    def __init__(self) -> None:
        self._by_path: dict[str, client.SensorInfo] = {}  # type: ignore
        self._by_cfg: dict[str, client.SensorInfo] = {}  # type: ignore

    def __get_sensorinfo(self, cfg: str) -> client.SensorInfo:  # type: ignore
        # ouster-sdk is a naughty, noisy library
        # it is in fact so noisy, that we have cut it off at the os level...
        stdout = os.dup(1)
        os.close(1)
        info = client.SensorInfo(cfg)  # type: ignore
        os.dup2(stdout, 1)
        os.close(stdout)
        return info

    def __getitem__(self, path: str) -> client.SensorInfo:  # type: ignore
        """Get sensor intrinsics by path.

        - First checks if we have a `SensorInfo` loaded already for the given
            path.
        - If not, we then check if we have a `SensorInfo` loaded with an
            identical configuration.
        - If neither is found, we read the file and create a new `SensorInfo`.

        Args:
            path: Path to the sensor intrinsics file.

        Returns:
            Ouster `SensorInfo`.
        """
        # We've never seen this path before. Need to inspect it.
        if path not in self._by_path:
            try:
                with open(path) as f:
                    cfg = f.read()
            except FileNotFoundError as e:
                raise ValueError(
                    f"Lidar sensor intrinsics {path} does not exist.") from e

            # Truly new -> load
            if cfg not in self._by_cfg:
                self._by_cfg[cfg] = self.__get_sensorinfo(cfg)
            # Make a link
            self._by_path[path] = self._by_cfg[cfg]

        return self._by_path[path]

__getitem__

__getitem__(path: str) -> SensorInfo

Get sensor intrinsics by path.

  • First checks if we have a SensorInfo loaded already for the given path.
  • If not, we then check if we have a SensorInfo loaded with an identical configuration.
  • If neither is found, we read the file and create a new SensorInfo.

Parameters:

Name Type Description Default
path str

Path to the sensor intrinsics file.

required

Returns:

Type Description
SensorInfo

Ouster SensorInfo.

Source code in format/src/roverd/transforms/ouster.py
def __getitem__(self, path: str) -> client.SensorInfo:  # type: ignore
    """Get sensor intrinsics by path.

    - First checks if we have a `SensorInfo` loaded already for the given
        path.
    - If not, we then check if we have a `SensorInfo` loaded with an
        identical configuration.
    - If neither is found, we read the file and create a new `SensorInfo`.

    Args:
        path: Path to the sensor intrinsics file.

    Returns:
        Ouster `SensorInfo`.
    """
    # We've never seen this path before. Need to inspect it.
    if path not in self._by_path:
        try:
            with open(path) as f:
                cfg = f.read()
        except FileNotFoundError as e:
            raise ValueError(
                f"Lidar sensor intrinsics {path} does not exist.") from e

        # Truly new -> load
        if cfg not in self._by_cfg:
            self._by_cfg[cfg] = self.__get_sensorinfo(cfg)
        # Make a link
        self._by_path[path] = self._by_cfg[cfg]

    return self._by_path[path]

roverd.transforms.ouster.Destagger

Bases: Transform[OSDepth, Depth]

Destagger Ouster Lidar depth data.

Parameters:

Name Type Description Default
config ConfigCache | None

if provided, use this configuration cache (to share with other transforms).

None
Source code in format/src/roverd/transforms/ouster.py
class Destagger(spec.Transform[types.OSDepth, types.Depth]):
    """Destagger Ouster Lidar depth data.

    Args:
        config: if provided, use this configuration cache (to share with other
            transforms).
    """

    def __init__(self, config: ConfigCache | None = None) -> None:
        if config is None:
            config = ConfigCache()
        self._config = config

    def __call__(self, data: types.OSDepth) -> types.Depth:
        """Destagger Ouster Lidar depth data.

        Args:
            data: Ouster Lidar depth data with staggered measurements.

        Returns:
            Depth data with destaggered measurements.
        """
        batch, t, el, az = data.rng.shape

        batch_last = rearrange(data.rng, "b t el az -> el az (b t)")
        destaggered_hwb = client.destagger(  # type: ignore
            self._config[data.intrinsics], batch_last)
        destaggered = rearrange(
            destaggered_hwb, "el az (b t) -> b t el az", b=batch, t=t)

        return types.Depth(rng=destaggered, timestamps=data.timestamps)

__call__

__call__(data: OSDepth) -> Depth

Destagger Ouster Lidar depth data.

Parameters:

Name Type Description Default
data OSDepth

Ouster Lidar depth data with staggered measurements.

required

Returns:

Type Description
Depth

Depth data with destaggered measurements.

Source code in format/src/roverd/transforms/ouster.py
def __call__(self, data: types.OSDepth) -> types.Depth:
    """Destagger Ouster Lidar depth data.

    Args:
        data: Ouster Lidar depth data with staggered measurements.

    Returns:
        Depth data with destaggered measurements.
    """
    batch, t, el, az = data.rng.shape

    batch_last = rearrange(data.rng, "b t el az -> el az (b t)")
    destaggered_hwb = client.destagger(  # type: ignore
        self._config[data.intrinsics], batch_last)
    destaggered = rearrange(
        destaggered_hwb, "el az (b t) -> b t el az", b=batch, t=t)

    return types.Depth(rng=destaggered, timestamps=data.timestamps)

roverd.transforms.ouster.PointCloud

Bases: Transform[OSDepth, PointCloud]

Calculate point cloud from Ouster Lidar depth data.

Parameters:

Name Type Description Default
min_range float | None

minimum range to include in the point cloud; if None, all points are included.

None
Source code in format/src/roverd/transforms/ouster.py
class PointCloud(spec.Transform[types.OSDepth, types.PointCloud]):
    """Calculate point cloud from Ouster Lidar depth data.

    Args:
        min_range: minimum range to include in the point cloud; if `None`,
            all points are included.
    """

    def __init__(self, min_range: float | None = None) -> None:
        self.cache = ConfigCache()
        self.lut_cache = {}
        self.min_range = min_range

    def __call__(self, data: types.OSDepth) -> types.PointCloud:
        """Convert Ouster Lidar depth data to point cloud.

        Args:
            data: Ouster Lidar depth data with staggered measurements.

        Returns:
            Point cloud data.
        """
        meta = self.cache[data.intrinsics]
        mid = id(meta)

        if mid not in self.lut_cache:
            self.lut_cache[mid] = client.XYZLut(meta)  # type: ignore

        xyz = []
        for frame in data.rng:
            pc = self.lut_cache[mid](frame).astype(np.float32)

            if self.min_range is not None:
                valid = (np.linalg.norm(pc, axis=-1) >= self.min_range)
            else:
                valid = np.any(pc != 0, axis=-1)
            valid = valid & np.all(~np.isnan(pc), axis=-1)

            xyz.append(pc.reshape(-1, 3)[valid.reshape(-1)])

        return types.PointCloud(
            xyz=np.concatenate(xyz, axis=0),
            length=np.array([len(x) for x in xyz], dtype=np.int32),
            timestamps=data.timestamps)

__call__

__call__(data: OSDepth) -> PointCloud

Convert Ouster Lidar depth data to point cloud.

Parameters:

Name Type Description Default
data OSDepth

Ouster Lidar depth data with staggered measurements.

required

Returns:

Type Description
PointCloud

Point cloud data.

Source code in format/src/roverd/transforms/ouster.py
def __call__(self, data: types.OSDepth) -> types.PointCloud:
    """Convert Ouster Lidar depth data to point cloud.

    Args:
        data: Ouster Lidar depth data with staggered measurements.

    Returns:
        Point cloud data.
    """
    meta = self.cache[data.intrinsics]
    mid = id(meta)

    if mid not in self.lut_cache:
        self.lut_cache[mid] = client.XYZLut(meta)  # type: ignore

    xyz = []
    for frame in data.rng:
        pc = self.lut_cache[mid](frame).astype(np.float32)

        if self.min_range is not None:
            valid = (np.linalg.norm(pc, axis=-1) >= self.min_range)
        else:
            valid = np.any(pc != 0, axis=-1)
        valid = valid & np.all(~np.isnan(pc), axis=-1)

        xyz.append(pc.reshape(-1, 3)[valid.reshape(-1)])

    return types.PointCloud(
        xyz=np.concatenate(xyz, axis=0),
        length=np.array([len(x) for x in xyz], dtype=np.int32),
        timestamps=data.timestamps)

roverd.transforms.ros

Rover to rosbag conversion.

Warning

This module is not automatically imported; you will need to explicitly import it:

from roverd.transforms import ros

You will also need to have the ros extra installed.

roverd.transforms.ros.rover_to_rosbag

rover_to_rosbag(
    out: str, imu: IMU, lidar: OSLidarDepth, min_range: float | None = None
) -> None

Write lidar and IMU data to a ros bag (to run SLAM, e.g., cartographer).

Parameters:

Name Type Description Default
out str

output rosbag file path.

required
imu IMU

IMU sensor.

required
lidar OSLidarDepth

Lidar sensor.

required
min_range float | None

minimum range for lidar points.

None
Source code in format/src/roverd/transforms/ros.py
def rover_to_rosbag(
    out: str, imu: sensors.IMU, lidar: sensors.OSLidarDepth,
    min_range: float | None = None
) -> None:
    """Write lidar and IMU data to a ros bag (to run SLAM, e.g., cartographer).

    Args:
        out: output rosbag file path.
        imu: IMU sensor.
        lidar: Lidar sensor.
        min_range: minimum range for lidar points.
    """
    os.makedirs(os.path.dirname(out), exist_ok=True)

    with Writer(out) as writer:

        msgtype = types.sensor_msgs__msg__Imu.__msgtype__
        connection = writer.add_connection("/imu", msgtype, latching=1)
        data = zip(
            Rotation.from_euler('XYZ', imu["rot"].read()).as_quat(False),
            imu["acc"].read(), imu["avel"].read(),
            *ts_sec_ns(imu.metadata.timestamps))
        for rot, acc, avel, sec, nsec, ts in tqdm(data, total=len(imu), desc="imu"):
            msg = __sensor_msgs_imu(rot, acc, avel, sec, nsec)
            writer.write(
                connection, ts,
                cdr_to_ros1(serialize_cdr(msg, msgtype), msgtype))  # type: ignore

        to_pointcloud = ouster.PointCloud(min_range=min_range)
        msgtype = types.sensor_msgs__msg__PointCloud2.__msgtype__
        connection = writer.add_connection("/points2", msgtype, latching=1)
        data = utils.Prefetch(
            zip(lidar.stream(), *ts_sec_ns(lidar.metadata.timestamps)))
        for raw, sec, nsec, ts in tqdm(data, total=len(lidar), desc="lidar"):
            points = to_pointcloud(raw).xyz
            msg = __sensor_msgs_pointcloud2(points, sec, nsec)
            writer.write(
                connection, ts,
                cdr_to_ros1(serialize_cdr(msg, msgtype), msgtype))  # type: ignore

roverd.transforms.ros.ts_sec_ns

ts_sec_ns(t)

Convert float64 timestamp to seconds, ns remainder, and ns timestamp.

Source code in format/src/roverd/transforms/ros.py
def ts_sec_ns(t):
    """Convert float64 timestamp to seconds, ns remainder, and ns timestamp."""
    ts = (t * 1e9).astype(np.int64)
    sec = t.astype(np.int64)
    nsec = ts % (1000 * 1000 * 1000)
    return sec, nsec, ts