Skip to content

roverc

Rover data collection platform.

Supported sensors:

  • Radar: TI AWR1843 / DCA1000EVM.
  • IMU: XSens MTi-3.
  • Lidar: Ouster lidar.
  • Camera: Any UVC camera.

roverc.Camera

Bases: Sensor

Video camera.

Parameters:

Name Type Description Default
idx int

camera index in /dev, i.e. /dev/video0.

0
width int

frame width, in pixels.

1920
height int

frame height, in pixels.

1080
fps float

camera framerate. Make sure the camera/capture card supports this exact framerate!

60.0
name str

sensor name, i.e. "camera".

'camera'
Source code in collect/roverc/camera.py
class Camera(Sensor):
    """Video camera.

    Args:
        idx: camera index in `/dev`, i.e. `/dev/video0`.
        width: frame width, in pixels.
        height: frame height, in pixels.
        fps: camera framerate. Make sure the camera/capture card supports this
            exact framerate!
        name: sensor name, i.e. "camera".
    """

    def __init__(
        self, idx: int = 0, width: int = 1920, height: int = 1080,
        fps: float = 60., name: str = "camera"
    ) -> None:
        super().__init__(name=name)

        self.idx = idx
        self.fps = fps
        self.width = width
        self.height = height

        self.cap = cv2.VideoCapture(idx)
        if not self.cap.isOpened():
            self.log.critical("Failed to open camera; is it connected?")
            raise SensorException

        self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
        self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
        self.cap.set(cv2.CAP_PROP_FPS, fps)
        self.log.info("Initialized camera {}: {}x{} @ {}fps".format(
            idx, width, height, fps))

    def capture(self, path: str) -> None:
        """Create capture (while `active` is set)."""
        out = CameraCapture(
            os.path.join(path, self.name), log=self.log,
            width=self.width, height=self.height, fps=self.fps)
        while self.active:
            ret = self.cap.grab()
            out.start()
            if not ret:
                self.log.error("Capture timed out.")
                self.active = False
                break

            _, frame = self.cap.retrieve()
            out.write(frame)
            out.end()

        out.close()

    def close(self):
        self.cap.release()

capture

capture(path: str) -> None

Create capture (while active is set).

Source code in collect/roverc/camera.py
def capture(self, path: str) -> None:
    """Create capture (while `active` is set)."""
    out = CameraCapture(
        os.path.join(path, self.name), log=self.log,
        width=self.width, height=self.height, fps=self.fps)
    while self.active:
        ret = self.cap.grab()
        out.start()
        if not ret:
            self.log.error("Capture timed out.")
            self.active = False
            break

        _, frame = self.cap.retrieve()
        out.write(frame)
        out.end()

    out.close()

roverc.CameraCapture

Bases: Capture

Camera capture data.

Source code in collect/roverc/camera.py
class CameraCapture(Capture):
    """Camera capture data."""

    def __init__(
        self, path: str, fps: float = 1.0,
        report_interval: float = 5.0, log: logging.Logger | None = None,
        width: int = 1920, height: int = 1080
    ) -> None:
        super().__init__(
            path=path, fps=fps, report_interval=report_interval, log=log)

        self.video: Queue = Queue()
        cast(channels.VideoChannel, self.sensor.create("video.avi", {
            "format": "mjpg", "type": "u1", "shape": (height, width, 3),
            "desc": "Ordinary camera video"
        })).consume(self.video, thread=True, fps=fps)

    def queue_length(self) -> int:
        return self.video.qsize()

    def write(self, data: np.ndarray) -> None:
        """Write MJPG stream."""
        self.video.put(data)

    def close(self) -> None:
        """Close files and clean up."""
        self.video.put(None)
        super().close()

close

close() -> None

Close files and clean up.

Source code in collect/roverc/camera.py
def close(self) -> None:
    """Close files and clean up."""
    self.video.put(None)
    super().close()

write

write(data: ndarray) -> None

Write MJPG stream.

Source code in collect/roverc/camera.py
def write(self, data: np.ndarray) -> None:
    """Write MJPG stream."""
    self.video.put(data)

roverc.Capture

Bases: ABC

Capture data for a generic sensor stream.

Parameters:

Name Type Description Default
path str

directory path to write data to.

required
fps float

target framerate

1.0
report_interval float

interval for reporting sensor statistics, in seconds

5.0
log Logger | None

parent logger to use

None
Source code in collect/roverc/common.py
class Capture(ABC):
    """Capture data for a generic sensor stream.

    Args:
        path: directory path to write data to.
        fps: target framerate
        report_interval: interval for reporting sensor statistics, in seconds
        log: parent logger to use
    """

    def __init__(
        self, path: str, fps: float = 1.0, report_interval: float = 5.0,
        log: logging.Logger | None = None
    ) -> None:
        self.sensor: DynamicSensor = DynamicSensor(path=path, create=True)
        self.ts: Queue = Queue()
        self.sensor.create("ts", {
            "format": "raw", "type": "f8", "shape": (),
            "description": "Timestamp, in seconds."}
        ).consume(self.ts, thread=True)

        self.len = 0
        self.period: list[float] = []
        self.runtime: list[float] = []
        self.qlen: int = 0

        self.prev_time = self.start_time = self.trace_time = perf_counter()
        self._ref_time: tuple[float, float] | None = None
        self._first_loop = True

        self.fps = fps
        self.report_interval = report_interval
        self.log = log if log else logging.Logger("placeholder")

    def queue_length(self) -> int:
        """Get queue length."""
        return self.ts.qsize()

    def start(self, timestamp: float | None = None) -> None:
        """Mark start of current frame processing.

        1. Records the current time as the timestamp for this frame, and
        2. Marks the start of time utilization calculation for this frame.

        Args:
            timestamp: timestamp to use; if `None`, uses the current system
                time.
        """
        t = time() if timestamp is None else timestamp
        self.start_time = perf_counter()
        self.ts.put(np.array(t, dtype=np.float64))
        self.len += 1

    def end(self):
        """Mark end of current frame processing."""
        assert self.start_time > 0
        end = perf_counter()

        self.period.append(end - self.prev_time)
        self.runtime.append(end - self.start_time)
        self.prev_time = end
        self.qlen = max(self.queue_length(), self.qlen)

        if self.len % int(self.fps * self.report_interval) == 0:
            self.reset_stats()
        if self._first_loop:
            self._first_loop = False
            self.log.info("Receiving data.")

    @abstractmethod
    def write(self, data: Any) -> None:
        """Write data."""
        ...

    def close(self) -> None:
        """Close files and clean up."""
        self.ts.put(None)

    def reset_stats(self) -> None:
        """Reset (and log) tracked statistics.

        Four values are logged:

        - `f`: frequency of the capture sensor, in Hz.
        - `u`: utilization of the capture, in percent.
        - `w`: WCET (worst-case execution time) of the capture, in ms.
        - `q`: maximum queue length.

        The log message is usually `info`, unless certain targets are violated:

        - `error`: if the observed frequency drops below 90% of the target, or
          the WCET exceeds the deadline (i.e. `T = D < C`).
        - `warning`: if the observed frequency drops below 99% of the target,
          or the WCET exceeds 90% of the deadline.
        """
        freq = 1 / np.mean(self.period)
        util = np.mean(self.runtime) * self.fps
        wcet = np.max(self.runtime)

        log_msg = (
            f"f: {freq:.2f} u: {util * 100:.0f}% "
            f"w: {wcet * 1000:.1f} q: {self.qlen}")
        if (freq < self.fps * 0.9) or (wcet * self.fps > 1.0):
            self.log.error(log_msg)
        elif (freq < self.fps * 0.99) or (wcet * self.fps > 0.9):
            self.log.warning(log_msg)
        else:
            self.log.info(log_msg)

        self.period = []
        self.runtime = []
        self.qlen = 0

close

close() -> None

Close files and clean up.

Source code in collect/roverc/common.py
def close(self) -> None:
    """Close files and clean up."""
    self.ts.put(None)

end

end()

Mark end of current frame processing.

Source code in collect/roverc/common.py
def end(self):
    """Mark end of current frame processing."""
    assert self.start_time > 0
    end = perf_counter()

    self.period.append(end - self.prev_time)
    self.runtime.append(end - self.start_time)
    self.prev_time = end
    self.qlen = max(self.queue_length(), self.qlen)

    if self.len % int(self.fps * self.report_interval) == 0:
        self.reset_stats()
    if self._first_loop:
        self._first_loop = False
        self.log.info("Receiving data.")

queue_length

queue_length() -> int

Get queue length.

Source code in collect/roverc/common.py
def queue_length(self) -> int:
    """Get queue length."""
    return self.ts.qsize()

reset_stats

reset_stats() -> None

Reset (and log) tracked statistics.

Four values are logged:

  • f: frequency of the capture sensor, in Hz.
  • u: utilization of the capture, in percent.
  • w: WCET (worst-case execution time) of the capture, in ms.
  • q: maximum queue length.

The log message is usually info, unless certain targets are violated:

  • error: if the observed frequency drops below 90% of the target, or the WCET exceeds the deadline (i.e. T = D < C).
  • warning: if the observed frequency drops below 99% of the target, or the WCET exceeds 90% of the deadline.
Source code in collect/roverc/common.py
def reset_stats(self) -> None:
    """Reset (and log) tracked statistics.

    Four values are logged:

    - `f`: frequency of the capture sensor, in Hz.
    - `u`: utilization of the capture, in percent.
    - `w`: WCET (worst-case execution time) of the capture, in ms.
    - `q`: maximum queue length.

    The log message is usually `info`, unless certain targets are violated:

    - `error`: if the observed frequency drops below 90% of the target, or
      the WCET exceeds the deadline (i.e. `T = D < C`).
    - `warning`: if the observed frequency drops below 99% of the target,
      or the WCET exceeds 90% of the deadline.
    """
    freq = 1 / np.mean(self.period)
    util = np.mean(self.runtime) * self.fps
    wcet = np.max(self.runtime)

    log_msg = (
        f"f: {freq:.2f} u: {util * 100:.0f}% "
        f"w: {wcet * 1000:.1f} q: {self.qlen}")
    if (freq < self.fps * 0.9) or (wcet * self.fps > 1.0):
        self.log.error(log_msg)
    elif (freq < self.fps * 0.99) or (wcet * self.fps > 0.9):
        self.log.warning(log_msg)
    else:
        self.log.info(log_msg)

    self.period = []
    self.runtime = []
    self.qlen = 0

start

start(timestamp: float | None = None) -> None

Mark start of current frame processing.

  1. Records the current time as the timestamp for this frame, and
  2. Marks the start of time utilization calculation for this frame.

Parameters:

Name Type Description Default
timestamp float | None

timestamp to use; if None, uses the current system time.

None
Source code in collect/roverc/common.py
def start(self, timestamp: float | None = None) -> None:
    """Mark start of current frame processing.

    1. Records the current time as the timestamp for this frame, and
    2. Marks the start of time utilization calculation for this frame.

    Args:
        timestamp: timestamp to use; if `None`, uses the current system
            time.
    """
    t = time() if timestamp is None else timestamp
    self.start_time = perf_counter()
    self.ts.put(np.array(t, dtype=np.float64))
    self.len += 1

write abstractmethod

write(data: Any) -> None

Write data.

Source code in collect/roverc/common.py
@abstractmethod
def write(self, data: Any) -> None:
    """Write data."""
    ...

roverc.IMU

Bases: Sensor

Xsens IMU Sensor.

Parameters:

Name Type Description Default
port str

serial port to use. Must have read/write permissions, e.g. sudo chmod 666 /dev/ttyUSB0.

'/dev/ttyUSB0'
baudrate int

serial baudrate.

115200
fps float

imu reading rate (Hz).

100.0
name str

sensor name, i.e. "imu".

'imu'
Source code in collect/roverc/imu.py
class IMU(Sensor):
    """Xsens IMU Sensor.

    Args:
        port: serial port to use. Must have read/write permissions, e.g.
            `sudo chmod 666 /dev/ttyUSB0`.
        baudrate: serial baudrate.
        fps: imu reading rate (Hz).
        name: sensor name, i.e. "imu".
    """

    def __init__(
        self, port: str = "/dev/ttyUSB0", baudrate: int = 115200,
        fps: float = 100.0, name: str = "imu"
    ) -> None:
        super().__init__(name=name)

        self.fps = fps
        self.imu = XsensIMU(port=port, baudrate=baudrate)
        self.log.info("Initialized IMU {}.".format(port))

    def capture(self, path: str) -> None:
        """Create capture (while `active` is set).

        NOTE: we discard the first 100 samples due to rate instability, likely
        due to serial port read latency/batching.
        """
        out = IMUCapture(
            os.path.join(path, self.name), log=self.log, fps=self.fps)
        self.imu.port.reset_input_buffer()
        while self.active:
            try:
                data = None
                while data is None:
                    data = self.imu.read()
                out.start()
                out.write(data)
                out.end()
            except InvalidChecksum:
                self.log.warn("Received invalid checksum.")
            except Exception as e:
                self.log.critical(repr(e))

        out.close()

    def close(self):
        pass

capture

capture(path: str) -> None

Create capture (while active is set).

NOTE: we discard the first 100 samples due to rate instability, likely due to serial port read latency/batching.

Source code in collect/roverc/imu.py
def capture(self, path: str) -> None:
    """Create capture (while `active` is set).

    NOTE: we discard the first 100 samples due to rate instability, likely
    due to serial port read latency/batching.
    """
    out = IMUCapture(
        os.path.join(path, self.name), log=self.log, fps=self.fps)
    self.imu.port.reset_input_buffer()
    while self.active:
        try:
            data = None
            while data is None:
                data = self.imu.read()
            out.start()
            out.write(data)
            out.end()
        except InvalidChecksum:
            self.log.warn("Received invalid checksum.")
        except Exception as e:
            self.log.critical(repr(e))

    out.close()

roverc.IMUCapture

Bases: Capture

IMU capture data.

Source code in collect/roverc/imu.py
class IMUCapture(Capture):
    """IMU capture data."""

    def __init__(
        self, path: str, fps: float = 1.0,
        report_interval: float = 5.0, log: logging.Logger | None = None
    ) -> None:
        super().__init__(
            path=path, fps=fps, report_interval=report_interval, log=log)

        self.outputs: dict[str, Queue] = {
            channel: Queue() for channel in ["rot", "acc", "avel"]}
        self.sensor.create("rot", {
            "format": "raw", "type": "f4", "shape": (3,),
            "desc": "Orientation (Euler Angles)"
        }).consume(self.outputs["rot"], thread=True)
        self.sensor.create("acc", {
            "format": "raw", "type": "f4", "shape": (3,),
            "desc": "Linear acceleration"
        }).consume(self.outputs["acc"], thread=True)
        self.sensor.create("avel", {
            "format": "raw", "type": "f4", "shape": (3,),
            "desc": "Angular velocity"
        }).consume(self.outputs["avel"], thread=True)

    def queue_length(self) -> int:
        return max(v.qsize() for v in self.outputs.values())

    def write(self, data: IMUData) -> None:
        """Write IMU data."""
        for k, v in self.outputs.items():
            v.put(getattr(data, k))

    def close(self) -> None:
        """Close files and clean up."""
        for v in self.outputs.values():
            v.put(None)
        super().close()

close

close() -> None

Close files and clean up.

Source code in collect/roverc/imu.py
def close(self) -> None:
    """Close files and clean up."""
    for v in self.outputs.values():
        v.put(None)
    super().close()

write

write(data: IMUData) -> None

Write IMU data.

Source code in collect/roverc/imu.py
def write(self, data: IMUData) -> None:
    """Write IMU data."""
    for k, v in self.outputs.items():
        v.put(getattr(data, k))

roverc.Lidar

Bases: Sensor

Ouster Lidar sensor.

Parameters:

Name Type Description Default
addr Optional[str]

lidar IP address; found automatically via avahi-browse -lrt _roger._tcp if not manually specified.

None
port_lidar int

lidar port; default 7502.

7502
port_imu int

integrated imu port; default 7503.

7503
mode str

lidar mode {columns}x{fps}.

'2048x10'
beams int

number of lidar beams.

64
name str

sensor name, i.e. "lidar".

'lidar'
Source code in collect/roverc/lidar.py
class Lidar(Sensor):
    """Ouster Lidar sensor.

    Args:
        addr: lidar IP address; found automatically via
            `avahi-browse -lrt _roger._tcp` if not manually specified.
        port_lidar: lidar port; default 7502.
        port_imu: integrated imu port; default 7503.
        mode: lidar mode `{columns}x{fps}`.
        beams: number of lidar beams.
        name: sensor name, i.e. "lidar".
    """

    def __init__(
        self, addr: Optional[str] = None, port_lidar: int = 7502,
        port_imu: int = 7503, mode: str = "2048x10", beams: int = 64,
        name: str = "lidar"
    ) -> None:
        super().__init__(name=name)

        addr = self.get_ip() if addr is None else addr
        if addr is None:
            self.log.critical("No Ouster lidar found; is it connected?")
            raise SensorException
        self.addr = addr

        config = client.SensorConfig()  # type: ignore
        config.udp_port_lidar = port_lidar
        config.udp_port_imu = port_imu
        config.operating_mode = client.OperatingMode.OPERATING_NORMAL  # type: ignore
        config.lidar_mode = client.LidarMode.from_string(mode)  # type: ignore

        # Ouster removed the ability to calculate the resolution from the
        # mode string. Not sure why, makes absolutely no sense.
        self.fps = float(mode.split('x')[1])
        self.shape = (beams, int(mode.split('x')[0]))

        client.set_config(  # type: ignore
            self.addr, config, persist=True, udp_dest_auto=True)

        self.log.info("Initialized lidar {}: {}-beam x {} x {} fps".format(
            self.addr, *self.shape, self.fps))

    @staticmethod
    def get_ip() -> Optional[str]:
        """Get IP address of ouster sensor."""
        avahi = subprocess.run(
            ["avahi-browse", "-lrpt", "_roger._tcp"], capture_output=True)
        for service in avahi.stdout.decode('utf-8').split('\n'):
            entries = service.split(';')
            match = (
                entries[0] == '=' and
                entries[2] == 'IPv4' and
                "Ouster" in entries[3] and
                len(entries) > 8)
            if match:
                return entries[7]
        else:
            return None

    def capture(self, path: str) -> None:
        """Create capture (while `active` is set)."""
        out = LidarCapture(
            os.path.join(path, self.name), log=self.log,
            fps=self.fps, shape=self.shape, compression=0)

        stream = client.Scans.stream(  # type: ignore
            hostname=self.addr, lidar_port=7502, complete=True, timeout=1.0)
        with open(os.path.join(path, self.name, "lidar.json"), 'w') as f:
            f.write(stream.metadata.to_json_string())  # type: ignore

        for scan in stream:
            out.start()
            data = {
                "rfl": scan.field(
                    client.ChanField.REFLECTIVITY).astype(np.uint8),  # type: ignore
                "nir": scan.field(
                    client.ChanField.NEAR_IR).astype(np.uint16),  # type: ignore
                "rng": np.minimum(
                    65535, scan.field(client.ChanField.RANGE)  # type: ignore
                ).astype(np.uint16)
            }
            out.write(data)  # type: ignore
            out.end()

            if not self.active:
                break

        out.close()

capture

capture(path: str) -> None

Create capture (while active is set).

Source code in collect/roverc/lidar.py
def capture(self, path: str) -> None:
    """Create capture (while `active` is set)."""
    out = LidarCapture(
        os.path.join(path, self.name), log=self.log,
        fps=self.fps, shape=self.shape, compression=0)

    stream = client.Scans.stream(  # type: ignore
        hostname=self.addr, lidar_port=7502, complete=True, timeout=1.0)
    with open(os.path.join(path, self.name, "lidar.json"), 'w') as f:
        f.write(stream.metadata.to_json_string())  # type: ignore

    for scan in stream:
        out.start()
        data = {
            "rfl": scan.field(
                client.ChanField.REFLECTIVITY).astype(np.uint8),  # type: ignore
            "nir": scan.field(
                client.ChanField.NEAR_IR).astype(np.uint16),  # type: ignore
            "rng": np.minimum(
                65535, scan.field(client.ChanField.RANGE)  # type: ignore
            ).astype(np.uint16)
        }
        out.write(data)  # type: ignore
        out.end()

        if not self.active:
            break

    out.close()

get_ip staticmethod

get_ip() -> Optional[str]

Get IP address of ouster sensor.

Source code in collect/roverc/lidar.py
@staticmethod
def get_ip() -> Optional[str]:
    """Get IP address of ouster sensor."""
    avahi = subprocess.run(
        ["avahi-browse", "-lrpt", "_roger._tcp"], capture_output=True)
    for service in avahi.stdout.decode('utf-8').split('\n'):
        entries = service.split(';')
        match = (
            entries[0] == '=' and
            entries[2] == 'IPv4' and
            "Ouster" in entries[3] and
            len(entries) > 8)
        if match:
            return entries[7]
    else:
        return None

roverc.LidarCapture

Bases: Capture

Lidar capture data.

Source code in collect/roverc/lidar.py
class LidarCapture(Capture):
    """Lidar capture data."""

    def __init__(
        self, path: str, shape: tuple[int, int] = (64, 2048),
        compression: int = 0, batch: int = 12, fps: float = 1.0,
        report_interval: float = 5.0, log: Optional[logging.Logger] = None
    ) -> None:
        super().__init__(
            path=path, fps=fps, report_interval=report_interval, log=log)

        self.outputs: dict[str, Queue] = {
            channel: Queue() for channel in ["rfl", "nir", "rng"]}

        kwargs = {"preset": compression, "batch": batch}
        cast(channels.LzmaFrameChannel, self.sensor.create("rfl", {
            "format": "lzmaf", "type": "u1", "shape": shape,
            "desc": "Object NIR reflectivity"}
        )).consume(self.outputs["rfl"], thread=True, **kwargs)
        cast(channels.LzmaFrameChannel, self.sensor.create("nir", {
            "format": "lzmaf", "type": "u2", "shape": shape,
            "desc": "Near infrared ambient photons"}
        )).consume(self.outputs["nir"], thread=True, **kwargs)
        cast(channels.LzmaFrameChannel, self.sensor.create("rng", {
            "format": "lzmaf", "type": "u2", "shape": shape,
            "desc": "Range, in millimeters"}
        )).consume(self.outputs["rng"], thread=True, **kwargs)

    def queue_length(self) -> int:
        return max(v.qsize() for v in self.outputs.values())

    def write(self, data: dict[str, np.ndarray]) -> None:
        """Write compressed lzma streams."""
        for k, v in data.items():
            self.outputs[k].put(v)

    def close(self) -> None:
        """Close files and clean up."""
        for v in self.outputs.values():
            v.put(None)
        super().close()

close

close() -> None

Close files and clean up.

Source code in collect/roverc/lidar.py
def close(self) -> None:
    """Close files and clean up."""
    for v in self.outputs.values():
        v.put(None)
    super().close()

write

write(data: dict[str, ndarray]) -> None

Write compressed lzma streams.

Source code in collect/roverc/lidar.py
def write(self, data: dict[str, np.ndarray]) -> None:
    """Write compressed lzma streams."""
    for k, v in data.items():
        self.outputs[k].put(v)

roverc.Radar

Bases: Sensor

TI AWR1843Boost Radar Sensor & DCA1000EVM capture card.

See XWRSystem for arguments.

Source code in collect/roverc/radar.py
class Radar(Sensor):
    """TI AWR1843Boost Radar Sensor & DCA1000EVM capture card.

    See [`XWRSystem`][xwr.] for arguments.
    """

    def __init__(
        self, name: str = "radar", radar: dict = {}, capture: dict = {}
    ) -> None:
        super().__init__(name=name)
        self.radar = XWRSystem(radar=radar, capture=capture)

    def capture(self, path: str) -> None:
        """Create capture (while `active` is set)."""
        out = RadarCapture(
            os.path.join(path, self.name), log=self.log,
            shape=self.radar.config.raw_shape, fps=self.radar.fps)

        with open(os.path.join(path, self.name, "radar.json"), 'w') as f:
            json.dump(self.radar.config.as_intrinsics(), f, indent=4)

        stream = self.radar.stream()
        for scan in stream:
            out.start(scan.timestamp)
            out.write(scan)
            out.end()

            if not self.active:
                break

        self.radar.stop()
        out.close()

capture

capture(path: str) -> None

Create capture (while active is set).

Source code in collect/roverc/radar.py
def capture(self, path: str) -> None:
    """Create capture (while `active` is set)."""
    out = RadarCapture(
        os.path.join(path, self.name), log=self.log,
        shape=self.radar.config.raw_shape, fps=self.radar.fps)

    with open(os.path.join(path, self.name, "radar.json"), 'w') as f:
        json.dump(self.radar.config.as_intrinsics(), f, indent=4)

    stream = self.radar.stream()
    for scan in stream:
        out.start(scan.timestamp)
        out.write(scan)
        out.end()

        if not self.active:
            break

    self.radar.stop()
    out.close()

roverc.RadarCapture

Bases: Capture

Radar capture data.

Parameters:

Name Type Description Default
path str

directory path to write data to.

required
fps float

target framerate

1.0
report_interval float

interval for reporting sensor statistics, in seconds

5.0
log Logger | None

parent logger to use

None
shape Sequence[int]

radar shape.

[]
Source code in collect/roverc/radar.py
class RadarCapture(Capture):
    """Radar capture data.

    Args:
        path: directory path to write data to.
        fps: target framerate
        report_interval: interval for reporting sensor statistics, in seconds
        log: parent logger to use
        shape: radar shape.
    """

    def __init__(
        self, path: str, fps: float = 1.0, report_interval: float = 5.0,
        log: logging.Logger | None = None, shape: Sequence[int] = []
    ) -> None:
        super().__init__(
            path=path, fps=fps, report_interval=report_interval, log=log)

        self.iq: Queue = Queue()
        self.sensor.create("iq", {
            "format": "raw", "type": "i2", "shape": shape,
            "desc": "Raw I/Q stream."}
        ).consume(self.iq, thread=True)

        self.valid: Queue = Queue()
        self.sensor.create("valid", {
            "format": "raw", "type": "u1", "shape": [],
            "desc": "True if this frame is complete (no zero-fill)."}
        ).consume(self.valid, thread=True)

    def queue_length(self) -> int:
        return self.iq.qsize()

    def write(self, data: capture.types.RadarFrame) -> None:
        """Write a single frame."""
        self.iq.put(data.data)
        self.valid.put(
            np.array(True, dtype=np.uint8) if data.complete
            else np.array(False, dtype=np.uint8))

    def close(self) -> None:
        """Close files and clean up."""
        self.iq.put(None)
        self.valid.put(None)
        super().close()

close

close() -> None

Close files and clean up.

Source code in collect/roverc/radar.py
def close(self) -> None:
    """Close files and clean up."""
    self.iq.put(None)
    self.valid.put(None)
    super().close()

write

write(data: RadarFrame) -> None

Write a single frame.

Source code in collect/roverc/radar.py
def write(self, data: capture.types.RadarFrame) -> None:
    """Write a single frame."""
    self.iq.put(data.data)
    self.valid.put(
        np.array(True, dtype=np.uint8) if data.complete
        else np.array(False, dtype=np.uint8))

roverc.Sensor

Bases: ABC

Generic sensor channel.

Communicates using local AF_UNIX sockets using the socket /tmp/rover/{name} with the provided sensor name.

Warning

Each sensor node should be initalized first.

Parameters:

Name Type Description Default
name str

sensor name, e.g. lidar, camera, radar

required
addr str

socket base address, i.e. /tmp/rover

'/tmp/rover'
report_interval float

logging interval for statistics

10.0
fps float

expected framerate for this sensor, in frames per second.

1.0
Source code in collect/roverc/common.py
class Sensor(ABC):
    """Generic sensor channel.

    Communicates using local `AF_UNIX` sockets using the socket
    `/tmp/rover/{name}` with the provided sensor name.

    !!! warning

        Each sensor node should be initalized first.

    Args:
        name: sensor name, e.g. lidar, camera, radar
        addr: socket base address, i.e. `/tmp/rover`
        report_interval: logging interval for statistics
        fps: expected framerate for this sensor, in frames per second.
    """

    def __init__(
        self, name: str, addr: str = "/tmp/rover",
        report_interval: float = 10.0, fps: float = 1.0
    ) -> None:
        self.name = name
        self.log = logging.getLogger(name=name)
        self.report_interval = report_interval
        self.fps = fps

        sock = os.path.join(addr, name)
        if os.path.exists(sock):
            os.remove(sock)
        os.makedirs(addr, exist_ok=True)

        self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        self._socket.settimeout(None)
        self._socket.bind(sock)
        self._socket.listen(1)

        self.active = False
        self._thread: threading.Thread | None = None
        self.frame_count = 0

    @abstractmethod
    def capture(self, path: str) -> None:
        """Run capture; should check `self.active` on every loop."""
        ...

    def close(self) -> None:
        """Clean up sensor."""
        pass

    def _start_capture(self, path: str | None) -> None:
        """Start capture."""
        if self.active:
            self.log.error("Tried to start capture when another is active.")
        elif path is None:
            self.log.error("A valid path must be provided to start.")
        else:
            def _capture_wrapped():
                self.frame_count = 0
                try:
                    self.capture(path)
                except Exception as e:
                    self.log.critical(repr(e))
                    self.log.debug(''.join(traceback.format_exception(e)))
                finally:
                    self.active = False

            self.log.info("Starting capture: {}".format(path))
            self.active = True
            self._thread = threading.Thread(target=_capture_wrapped)
            self._thread.start()

    def _stop_capture(self) -> None:
        """End capture."""
        if not self.active:
            return

        self.log.debug("Stopping capture...")
        self.active = False
        if self._thread is not None:
            self._thread.join()
            self._thread = None
        self.log.info("Stopped capture.")

    def _recv(self) -> dict:
        """Receive message (spins until a valid message is received)."""
        while True:
            connection, _ = self._socket.accept()
            data = connection.recv(4096).decode()
            connection.close()

            try:
                return json.loads(data)
            except json.JSONDecodeError:
                self.log.error("Invalid json: {}".format(data))

    def loop(self) -> None:
        """Main control loop (spins until `exit` is received)."""
        while True:
            cmd = self._recv()
            cmd_type = cmd.get("type")
            if cmd_type == 'start':
                self._start_capture(cmd.get("path", ""))
            elif cmd_type == 'stop':
                self._stop_capture()
            elif cmd_type == 'exit':
                self.log.info("Exiting...")
                self._stop_capture()
                break
            else:
                self.log.error("Invalid command type: {}".format(cmd_type))
        self.close()

capture abstractmethod

capture(path: str) -> None

Run capture; should check self.active on every loop.

Source code in collect/roverc/common.py
@abstractmethod
def capture(self, path: str) -> None:
    """Run capture; should check `self.active` on every loop."""
    ...

close

close() -> None

Clean up sensor.

Source code in collect/roverc/common.py
def close(self) -> None:
    """Clean up sensor."""
    pass

loop

loop() -> None

Main control loop (spins until exit is received).

Source code in collect/roverc/common.py
def loop(self) -> None:
    """Main control loop (spins until `exit` is received)."""
    while True:
        cmd = self._recv()
        cmd_type = cmd.get("type")
        if cmd_type == 'start':
            self._start_capture(cmd.get("path", ""))
        elif cmd_type == 'stop':
            self._stop_capture()
        elif cmd_type == 'exit':
            self.log.info("Exiting...")
            self._stop_capture()
            break
        else:
            self.log.error("Invalid command type: {}".format(cmd_type))
    self.close()

roverc.SensorException

Bases: Exception

Sensor-related failure.

Source code in collect/roverc/common.py
class SensorException(Exception):
    """Sensor-related failure."""

    pass