Skip to content

CLI Tools

Usage

The CLI tools use tyro:

  • Positional arguments ("required") are passed as positional command line arguments
  • Named arguments are passed as flagged command line arguments

roverd blobify

Convert a trace to use blob channels.

uv run roverd blobify /path/to/src /path/to/dst

Warning

This script requires ffmpeg to be installed, which we use as a fast way to split mjpeg video files into individual frames.

Danger

Blob channels store each sample as a separate file, which can lead to a very large number of files and high file system overhead. This should only done on a proper blob storage backend, e.g. AWS S3 or Azure Blob Storage! Local file systems or HPC storage systems should stick to the default channel types.

Parameters:

Name Type Description Default
src str

path to the source trace.

required
dst str

path to the output trace.

required
workers int

number of worker threads for writing blobs.

64
Source code in format/src/roverd/_cli/blobify.py
def cli_blobify(src: str, dst: str, /, workers: int = 64) -> None:
    """Convert a trace to use blob channels.

    ```sh
    uv run roverd blobify /path/to/src /path/to/dst
    ```

    !!! warning

        This script requires `ffmpeg` to be installed, which we use as a fast
        way to split `mjpeg` video files into individual frames.

    !!! danger

        Blob channels store each sample as a separate file, which can lead to
        a very large number of files and high file system overhead. This should
        only done on a proper blob storage backend, e.g. AWS S3 or Azure Blob
        Storage! Local file systems or HPC storage systems should stick to
        the default channel types.

    Args:
        src: path to the source trace.
        dst: path to the output trace.
        workers: number of worker threads for writing blobs.
    """
    def _copy(file: str) -> None:
        shutil.copy(os.path.join(src, file), os.path.join(dst, file))

    trace = Trace.from_config(src, sensors={
        "_camera": None, "radar": None, "lidar": None,
        "camera": None, "imu": None,
    })

    os.makedirs(dst, exist_ok=True)
    for name, sensor in trace.sensors.items():
        assert isinstance(sensor, DynamicSensor)
        s_copy = DynamicSensor(
            os.path.join(dst, name), create=True, exist_ok=True)

        for ch_name, channel in sensor.channels.items():
            if ch_name in {"ts", "valid", "rot", "acc", "avel"}:
                ch_copy = s_copy.create(ch_name, sensor.config[ch_name])
                ch_copy.write(channel.read(start=0, samples=-1))
            else:
                cfg = {
                    "type": sensor.config[ch_name]['type'],
                    "shape": channel.shape
                }
                if ch_name == 'video.avi':
                    cfg["format"] = "jpg"
                    ch_copy = s_copy.create(ch_name, cfg)
                    os.makedirs(ch_copy.path, exist_ok=True)
                    subprocess.call(
                        f"ffmpeg -i {channel.path} -c:v copy -f image2 "
                        f"{ch_copy.path}/%06d.jpg", shell=True)
                elif ch_name == 'iq':
                    cfg["format"] = "npz"
                    ch_copy = s_copy.create(ch_name, cfg, args={
                        "compress": False, "workers": workers})
                    ch_copy.write(channel.read())
                else:
                    cfg["format"] = "npz"
                    ch_copy = s_copy.create(ch_name, cfg, args={
                        "compress": True, "workers": workers})
                    ch_copy.write(channel.read())

    _copy("lidar/lidar.json")
    _copy("radar/radar.json")
    os.makedirs(os.path.join(dst, "_radar"), exist_ok=True)
    _copy("_radar/pose.npz")
    _copy("config.yaml")

roverd extract

Extract a subset of a trace.

$ uv run roverd extract /data/grt/bike/point.out extracted-trace \
    --start 0.5 --length 0.001 --relative
$ uv run roverd info extracted-trace
start    1727901429.808
length   1.775
...

Info

Two of start, end, and length must be specified. If relative, these values are specified as a proportion of the trace duration.

Parameters:

Name Type Description Default
src str

path to the trace directory.

required
dst str

output trace directory.

required
start float | None

start time offset relative to the trace start.

None
end float | None

end time offset relative to the trace start (if positive) or trace end (if negative).

None
length float | None

length of the extracted trace in seconds.

None
relative bool

whether the start/end/length values are relative to the trace duration, in seconds.

False
Source code in format/src/roverd/_cli/extract.py
def cli_extract(
    src: str, dst: str, /, start: float | None = None, end: float | None = None,
    length: float | None = None, relative: bool = False
) -> None:
    r"""Extract a subset of a trace.

    ```sh
    $ uv run roverd extract /data/grt/bike/point.out extracted-trace \
        --start 0.5 --length 0.001 --relative
    $ uv run roverd info extracted-trace
    start    1727901429.808
    length   1.775
    ...
    ```

    !!! info

        Two of `start`, `end`, and `length` must be specified. If `relative`,
        these values are specified as a proportion of the trace duration.

    Args:
        src: path to the trace directory.
        dst: output trace directory.
        start: start time offset relative to the trace start.
        end: end time offset relative to the trace start (if positive) or
            trace end (if negative).
        length: length of the extracted trace in seconds.
        relative: whether the start/end/length values are relative to the trace
            duration, in seconds.
    """
    trace = Trace.from_config(src)

    trace_start = max(v.metadata.timestamps[0] for v in trace.sensors.values())
    trace_end = min(v.metadata.timestamps[-1] for v in trace.sensors.values())
    duration = trace_end - trace_start

    if sum(x is not None for x in (start, end, length)) < 2:
        raise ValueError(
            "Two of `start`, `end`, and `length` must be specified.")

    if end is None and start is not None and length is not None:
        end = start + length
    if start is None and end is not None and length is not None:
        start = end - length

    if relative:
        start = trace_start + (start * duration if start is not None else 0)
        end = trace_start + (end * duration if end is not None else 0)

    if os.path.exists(dst):
        raise FileExistsError(f"Output directory {dst} already exists.")

    os.makedirs(dst)
    for s_name, sensor in trace.sensors.items():
        assert isinstance(sensor, DynamicSensor)
        s_copy = DynamicSensor(os.path.join(dst, s_name), create=True)

        i_start, i_end = np.searchsorted(
            sensor.metadata.timestamps, np.array([start, end]))

        for ch_name, channel in sensor.channels.items():
            ch_copy = s_copy.create(ch_name, sensor.config[ch_name])
            ch_copy.write(channel.read(i_start, samples=i_end - i_start))

roverd info

Print trace metadata.

Only metadata for non-virtual (originally collected) sensors is shown.

uv run roverd info /data/grt/bike/point.out
Sample output
$ roverd info /data/grt/bike/point.out
start    1727900496.810
length   1865.947

total    61.2 GB (32.8 MB/s)
radar    29.3 GB (15.7 MB/s, n=37320, t=1865.9s)
    ts        299 KB (rate= 160  B/s)
    iq       29.3 GB (rate=15.7 MB/s)
    valid    37.3 KB (rate=  20  B/s)
camera   21.1 GB (11.3 MB/s, n=56053, t=1868.3s)
    ts        448 KB (rate= 240  B/s)
    video    21.1 GB (raw= 349 GB, ratio=16.52, rate=11.3 MB/s)
lidar    10.8 GB (5.78 MB/s, n=18570, t=1867.1s)
    ts        149 KB (rate=79.6  B/s)
    rfl      2.03 GB (raw=4.87 GB, ratio= 2.39, rate=1.09 MB/s)
    nir      5.27 GB (raw=9.74 GB, ratio= 1.85, rate=2.82 MB/s)
    rng      3.48 GB (raw=9.74 GB, ratio= 2.80, rate=1.86 MB/s)
imu      8.22 MB ( 4.4 KB/s, n=186847, t=1868.3s)
    ts       1.49 MB (rate= 800  B/s)
    rot      2.24 MB (rate= 1.2 KB/s)
    acc      2.24 MB (rate= 1.2 KB/s)
    avel     2.24 MB (rate= 1.2 KB/s)

Parameters:

Name Type Description Default
path str

path to the trace directory.

required
Source code in format/src/roverd/_cli/info.py
def cli_info(path: str, /,) -> None:
    """Print trace metadata.

    Only metadata for non-virtual (originally collected) sensors is shown.

    ```sh
    uv run roverd info /data/grt/bike/point.out
    ```

    ??? quote "Sample output"

        ```
        $ roverd info /data/grt/bike/point.out
        start    1727900496.810
        length   1865.947

        total    61.2 GB (32.8 MB/s)
        radar    29.3 GB (15.7 MB/s, n=37320, t=1865.9s)
            ts        299 KB (rate= 160  B/s)
            iq       29.3 GB (rate=15.7 MB/s)
            valid    37.3 KB (rate=  20  B/s)
        camera   21.1 GB (11.3 MB/s, n=56053, t=1868.3s)
            ts        448 KB (rate= 240  B/s)
            video    21.1 GB (raw= 349 GB, ratio=16.52, rate=11.3 MB/s)
        lidar    10.8 GB (5.78 MB/s, n=18570, t=1867.1s)
            ts        149 KB (rate=79.6  B/s)
            rfl      2.03 GB (raw=4.87 GB, ratio= 2.39, rate=1.09 MB/s)
            nir      5.27 GB (raw=9.74 GB, ratio= 1.85, rate=2.82 MB/s)
            rng      3.48 GB (raw=9.74 GB, ratio= 2.80, rate=1.86 MB/s)
        imu      8.22 MB ( 4.4 KB/s, n=186847, t=1868.3s)
            ts       1.49 MB (rate= 800  B/s)
            rot      2.24 MB (rate= 1.2 KB/s)
            acc      2.24 MB (rate= 1.2 KB/s)
            avel     2.24 MB (rate= 1.2 KB/s)
        ```

    Args:
        path: path to the trace directory.
    """
    ds = Trace.from_config(path)

    start = max(v.metadata.timestamps[0] for v in ds.sensors.values())
    end = min(v.metadata.timestamps[-1] for v in ds.sensors.values())
    print(f"start    {start:.3f}")
    print(f"length   {end - start:.3f}")
    print("")

    print("total    {} ({})".format(
        _size(ds.filesize),
        _size(ds.datarate, suffix='/s')))
    for sname, sensor in ds.sensors.items():
        sensor = cast(sensors.DynamicSensor, sensor)
        print("{:8} {} ({}, n={}, t={:.1f}s)".format(
            sname,
            _size(sensor.filesize),
            _size(sensor.datarate, suffix='/s'),
            len(sensor),
            sensor.duration))
        for cname, channel in sensor.channels.items():
            if sensor.config[cname]['format'] != 'raw':
                raw = channel.size * len(sensor)
                print("    {:8} {} (raw={}, ratio={:5.2f}, rate={})".format(
                    cname.split('.')[0],
                    _size(channel.filesize),
                    _size(raw),
                    raw / channel.filesize,
                    _size(channel.filesize / sensor.duration, suffix='/s')))
            else:
                print("    {:8} {} (rate={})".format(
                    cname.split('.')[0],
                    _size(channel.filesize),
                    _size(channel.filesize / sensor.duration, suffix='/s')))

roverd list

List traces (recursively) in a directory by looking for config.yaml.

$ uv run roverd list /path/to/datasets
example_a/trace1
example_a/trace2
example_b/trace1
...

Tip

This CLI is intended to be piped to other commands (and roverd CLI tools), e.g.:

# Count traces
uv run roverd list /path/to/traces | wc -l
# Loop over traces
for trace in `uv run roverd list /path/to/traces`; do echo $trace; done

Parameters:

Name Type Description Default
path str

directory to search inside.

required
follow_symlinks bool

whether to follow symlinks when searching.

False
Source code in format/src/roverd/_cli/list.py
def cli_list(path: str, /, follow_symlinks: bool = False) -> None:
    """List traces (recursively) in a directory by looking for `config.yaml`.

    ```sh
    $ uv run roverd list /path/to/datasets
    example_a/trace1
    example_a/trace2
    example_b/trace1
    ...
    ```

    !!! tip

        This CLI is intended to be piped to other commands (and `roverd` CLI
        tools), e.g.:
        ```sh
        # Count traces
        uv run roverd list /path/to/traces | wc -l
        # Loop over traces
        for trace in `uv run roverd list /path/to/traces`; do echo $trace; done
        ```

    Args:
        path: directory to search inside.
        follow_symlinks: whether to follow symlinks when searching.
    """
    traces = Dataset.find_traces(path, follow_symlinks=follow_symlinks)
    relative_traces = [os.path.relpath(trace, path) for trace in traces]
    print('\n'.join(relative_traces))

roverd validate

Validate dataset files.

$ uv run roverd validate /data/grt
Validate: 166 traces with 0 containing errors.
$ echo $?
0  # would be 1 if there were any errors
Usage with custom schema
grt.yaml
camera: ["ts", "video.avi"]
_camera: ["ts", "segment"]
lidar: ["ts", "rng"]
radar: ["ts", "iq"]
imu: ["ts", "rot", "acc", "avel"]
_:
- _camera/pose.npz
- _fusion/indices.npz
- _lidar/pose.npz
- _radar/pose.npz
- _slam/trajectory.csv
/bin/sh
$ roverd validate /data/grt --schema grt.yaml
Validate: 166 traces with 0 containing errors.

Parameters:

Name Type Description Default
path Sequence[str]

Target path or list of paths to validate.

required
schema str | None

Dataset file schema (yaml) to check. If not specified, uses a default schema which corresponds to raw files which are expected to be collected by the red-rover rig.

None
fix_errors bool

If True, fix consistency errors (data not present, but metadata is present).

False
Source code in format/src/roverd/_cli/validate.py
def cli_validate(
    path: Sequence[str], /, schema: str | None = None,
    fix_errors: bool = False
) -> None:
    """Validate dataset files.

    ```sh
    $ uv run roverd validate /data/grt
    Validate: 166 traces with 0 containing errors.
    $ echo $?
    0  # would be 1 if there were any errors
    ```

    ??? info "Usage with custom schema"

        ```yaml title="grt.yaml"
        camera: ["ts", "video.avi"]
        _camera: ["ts", "segment"]
        lidar: ["ts", "rng"]
        radar: ["ts", "iq"]
        imu: ["ts", "rot", "acc", "avel"]
        _:
        - _camera/pose.npz
        - _fusion/indices.npz
        - _lidar/pose.npz
        - _radar/pose.npz
        - _slam/trajectory.csv
        ```

        ```sh title="/bin/sh"
        $ roverd validate /data/grt --schema grt.yaml
        Validate: 166 traces with 0 containing errors.
        ```

    Args:
        path: Target path or list of paths to validate.
        schema: Dataset file schema (yaml) to check. If not specified, uses a
            default schema which corresponds to raw files which are expected
            to be collected by the `red-rover` rig.
        fix_errors: If `True`, fix consistency errors (data not present, but
            metadata is present).
    """
    # We'll get a lot of warings for missing timestamps. Don't warn, since
    # the schema will explicitly catch them if the user cares.
    warnings.filterwarnings(
        "ignore", message="Sensor metadata does not contain 'ts' channel")

    if schema is None:
        _schema = {
            "lidar": ["ts", "rfl", "nir", "rng"],
            "radar": ["ts", "iq", "valid"],
            "camera": ["ts", "video.avi"],
            "imu": ["ts", "rot", "acc", "avel"]
        }
    else:
        with open(schema) as f:
            _schema = yaml.load(f, Loader=yaml.SafeLoader)

    datasets = Dataset.find_traces(*path)
    n_errors = 0
    for path in datasets:
        errors = _validate_schema(path, _schema)
        errors += _validate_consistency(path, fix_errors=fix_errors)
        if errors:
            n_errors += 1
            print(path)
            print('\n'.join("    " + x for x in errors))

    if n_errors > 0:
        print("")
    print(
        f"Validate: {len(datasets)} traces with {n_errors} containing "
        "errors.")
    if n_errors > 0:
        exit(1)

roverd rosbag

Write lidar and IMU data to a ROS 1 bag.

uv run --extra ros roverd rosbag data/wiselab --min_range 0.5

Warning

This CLI command requires the roverd[ros] extra to be installed.

Danger

Rosbags are incredibly inefficient; expect for the output bag file to be ~10x larger than the input depth maps!

Parameters:

Name Type Description Default
path str

data path.

required
out str | None

output rosbag file path; if None, uses _scratch/lidar.bag in the dataset directory.

None
min_range float | None

minimum range (in meters) for lidar points.

None
Source code in format/src/roverd/_cli/rosbag.py
def cli_rosbag(
    path: str, /,
    out: str | None = None, min_range: float | None = None
) -> None:
    """Write lidar and IMU data to a ROS 1 bag.

    ```sh
    uv run --extra ros roverd rosbag data/wiselab --min_range 0.5
    ```

    !!! warning

        This CLI command requires the `roverd[ros]` extra to be installed.

    !!! danger

        Rosbags are incredibly inefficient; expect for the output bag file to
        be ~10x larger than the input depth maps!

    Args:
        path: data path.
        out: output rosbag file path; if `None`, uses `_scratch/lidar.bag` in
            the dataset directory.
        min_range: minimum range (in meters) for lidar points.
    """
    try:
        from roverd.transforms.ros import rover_to_rosbag
    except ImportError as e:
        raise ImportError(
            f"Could not import `rover_to_ros` ({e}). Make sure the `ros` "
            f"extra is installed (i.e., `pip install roverd[ros]`).")

    if out is None:
        out = os.path.join(path, "_scratch", "lidar.bag")

    trace = Trace.from_config(
        path, sensors={
            "lidar": partial(sensors.OSLidarDepth, correction="auto"),
            "imu": partial(sensors.IMU, correction="auto")
        })
    lidar = cast(sensors.OSLidarDepth, trace["lidar"])
    imu = cast(sensors.IMU, trace["imu"])

    rover_to_rosbag(out=out, lidar=lidar, imu=imu, min_range=min_range)