Skip to content

xwr

TI mmWave Radar + DCA1000EVM Capture Card Raw Data Capture API.

Usage

To use the high-level API, create a XWRConfig and DCAConfig; then pass these to the XWRSystem. Use stream or qstream to automatically configure, start, and stream spectrum data from the radar.

xwr.XWRSystem

Bases: Generic[TRadar]

Radar capture system with a mmWave Radar and DCA1000EVM.

Known Constraints

The XWRSystem will check for certain known constraints, and warn if these are violated via a logger:

  • Radar data throughput is greater than 80% of the capture card theoretical network throughput.
  • Receive buffer size (in the linux networking stack) can hold less than 2 full frames.
  • The duty cycle (active frame time / frame period) of the radar is greater than 99%.
  • The ADC is still sampling when the ramp ends.
  • The range-Doppler frame size is greater than 2^14.
  • The number of samples per chirp (i.e., range resolution) or chirps per frame (i.e., doppler resolution) is not a power of 2.
Type Parameters
  • TRadar: radar type (subclass of XWRBase)

Parameters:

Name Type Description Default
radar XWRConfig | dict

radar configuration; if dict, the key/value pairs are passed to XWRConfig.

required
capture DCAConfig | dict

capture card configuration; if dict, the key/value pairs are passed to DCAConfig.

required
name str

friendly name for logging; can be default.

'RadarCapture'
strict bool

if True, raise an error instead of logging a warning if the radar configuration contains potentially invalid values.

False
Source code in src/xwr/system.py
class XWRSystem(Generic[TRadar]):
    """Radar capture system with a mmWave Radar and DCA1000EVM.

    !!! info "Known Constraints"

        The `XWRSystem` will check for certain known constraints, and warn if
        these are violated via a logger:

        - Radar data throughput is greater than 80% of the capture card
            theoretical network throughput.
        - Receive buffer size (in the linux networking stack) can hold less
            than 2 full frames.
        - The duty cycle (active frame time / frame period) of the radar is
            greater than 99%.
        - The ADC is still sampling when the ramp ends.
        - The range-Doppler frame size is greater than 2^14.
        - The number of samples per chirp (i.e., range resolution) or chirps
            per frame (i.e., doppler resolution) is not a power of 2.

    Type Parameters:
        - `TRadar`: radar type (subclass of [`XWRBase`][xwr.radar.])

    Args:
        radar: radar configuration; if `dict`, the key/value pairs are passed
            to `XWRConfig`.
        capture: capture card configuration; if `dict`, the key/value pairs are
            passed to `DCAConfig`.
        name: friendly name for logging; can be default.
        strict: if `True`, raise an error instead of logging a warning if the
            radar configuration contains potentially invalid values.
    """

    def __init__(
        self, *, radar: XWRConfig | dict, capture: DCAConfig | dict,
        name: str = "RadarCapture", strict: bool = False
    ) -> None:
        if isinstance(radar, dict):
            radar = XWRConfig(**radar)
        if isinstance(capture, dict):
            capture = DCAConfig(**capture)

        self.log: logging.Logger = logging.getLogger(name)
        self._check_config(radar, capture)

        self.dca: DCA1000EVM = capture.create()
        self.xwr: TRadar = cast(
            type[TRadar], radar.device_type)(port=radar.port)

        self.config = radar
        self.fps: float = 1000.0 / radar.frame_period
        self.strict = strict

    def _assert(self, cond: bool, desc: str) -> None:
        """Check a condition and log (or raise) a warning if it is not met."""
        if not cond:
            if self.strict:
                raise ValueError(f"Potentially invalid configuration: {desc}")
            self.log.warning(f"Invalid radar configuration: {desc}")
        else:
            self.log.debug(f"Passed check: {desc}")

    def _check_config(self, radar: XWRConfig, capture: DCAConfig) -> None:
        """Check config, and warn if potentially invalid."""
        util = 100 * radar.throughput / capture.throughput
        self.log.info(
            f"Radar/Capture card throughput: {int(radar.throughput / 1e6)} "
            f"Mbps / {int(capture.throughput / 1e6)} Mbps ({util:.1f}%)")
        self._assert(util < 80, f"Network utilization > 80%: {util:.1f}%")

        ratio = capture.socket_buffer / radar.frame_size
        self.log.info("Recv buffer size: {:.2f} frames".format(ratio))
        self._assert(ratio > 2.0,
            f"Recv buffer < 2 frames: {capture.socket_buffer} "
            f"(1 frame = {radar.frame_size})")

        duty_cycle = 100 * radar.frame_time / radar.frame_period
        self.log.info(f"Radar duty cycle: {duty_cycle:.1f}%")
        self._assert(duty_cycle < 99, f"Duty cycle > 99%: {duty_cycle:.1f}%")

        excess = radar.ramp_end_time - radar.adc_start_time - radar.sample_time
        self.log.info(f"Excess ramp time: {excess:.1f}us")
        self._assert(excess >= 0, f"Excess ramp time < 0: {excess:.1f}us")

        frame_size = radar.frame_length * radar.adc_samples
        self.log.info(
            f"Range-Doppler size: {radar.frame_length} x "
            f"{radar.adc_samples} = {frame_size}")
        self._assert(frame_size <= 2**14,
            f"Range-doppler frame size > 2^14: {frame_size}")
        self._assert(radar.frame_length & (radar.frame_length - 1) == 0,
            f"Frame length not a power of 2: {radar.frame_length}")
        self._assert(radar.adc_samples & (radar.adc_samples - 1) == 0,
            f"ADC samples not a power of 2: {radar.adc_samples}")

    def stream(self) -> Iterator[types.RadarFrame]:
        """Iterator which yields successive frames.

        !!! note

            `.stream()` does not internally terminate data collection;
            another worker must call [`stop`][..].

        Yields:
            Read frames; the iterator terminates when the capture card stream
                times out.
        """
        # send a "stop" command in case the capture card is still running
        self.dca.stop()
        # reboot radar in case it is stuck
        self.dca.reset_ar_device()
        # clear buffer from possible previous data collection
        # (will mess up byte count indices if we don't)
        self.dca.flush()

        # start capture card & radar
        self.dca.start()
        self.xwr.setup(**self.config.as_dict())
        self.xwr.start()

        return self.dca.stream(self.config.raw_shape)

    @overload
    def qstream(self, numpy: Literal[True]) -> Queue[np.ndarray | None]: ...

    @overload
    def qstream(
        self, numpy: Literal[False] = False
    ) -> Queue[types.RadarFrame | None]: ...

    def qstream(
        self, numpy: bool = False
    ) -> Queue[types.RadarFrame | None] | Queue[np.ndarray | None]:
        """Read into a queue from a threaded worker.

        The threaded worker is run with `daemon=True`. Like [`stream`][..],
        `.qstream()` also relies on another worker to trigger [`stop`][..].

        !!! note

            If a `TimeoutError` is received (e.g. after `.stop()`), the
            error is caught, and the stream is halted.

        Args:
            numpy: yield a numpy array instead of a `RadarFrame`.

        Returns:
            A queue of `RadarFrame` (or np.ndarray) read by the capture card.
                When the stream terminates, `None` is written to the queue.
        """
        out: Queue[types.RadarFrame | None] | Queue[np.ndarray | None] = Queue()

        def worker():
            try:
                for frame in self.stream():
                    if numpy:
                        if frame is not None:
                            frame = np.frombuffer(
                                frame.data, dtype=np.int16
                            ).reshape(*self.config.raw_shape)
                        # Type inference can't figure out this overload check
                        cast(Queue[np.ndarray | None], out).put(frame)
                    else:
                        out.put(frame)
            except TimeoutError:
                pass
            out.put(None)

        threading.Thread(target=worker, daemon=True).start()
        return out

    @overload
    def dstream(self, numpy: Literal[True]) -> Iterator[np.ndarray]: ...

    @overload
    def dstream(self, numpy: Literal[False]) -> Iterator[types.RadarFrame]: ...

    def dstream(
        self, numpy: bool = False
    ) -> Iterator[types.RadarFrame | np.ndarray]:
        """Stream frames, dropping any frames if the consumer gets behind.

        Args:
            numpy: yield a numpy array instead of a `RadarFrame`.

        Yields:
            Read frames; the iterator terminates when the capture card stream
                times out.
        """
        def drop_frames(q):
            dropped = 0
            latest = q.get(block=True)

            while True:
                try:
                    latest = q.get_nowait()
                    dropped += 1
                except Empty:
                    return latest, dropped

        q = self.qstream(numpy=numpy)
        while True:
            frame, dropped = drop_frames(q)
            if dropped > 0:
                self.log.warning(f"Dropped {dropped} frames.")
            if frame is None:
                break
            else:
                yield frame

    def stop(self) -> None:
        """Stop by halting the capture card and reboot the radar.

        In testing, we found that the radar may ignore commands if the frame
        timings are too tight, which prevents a soft reset. We simply reboot
        the radar via the capture card instead.

        !!! warning

            If you fail to `.stop()` the system before exiting, the radar may
            become non-responsive, and require a power cycle.
        """
        self.dca.stop()
        self.dca.reset_ar_device()

dstream

dstream(numpy: Literal[True]) -> Iterator[ndarray]
dstream(numpy: Literal[False]) -> Iterator[RadarFrame]
dstream(numpy: bool = False) -> Iterator[RadarFrame | ndarray]

Stream frames, dropping any frames if the consumer gets behind.

Parameters:

Name Type Description Default
numpy bool

yield a numpy array instead of a RadarFrame.

False

Yields:

Type Description
RadarFrame | ndarray

Read frames; the iterator terminates when the capture card stream times out.

Source code in src/xwr/system.py
def dstream(
    self, numpy: bool = False
) -> Iterator[types.RadarFrame | np.ndarray]:
    """Stream frames, dropping any frames if the consumer gets behind.

    Args:
        numpy: yield a numpy array instead of a `RadarFrame`.

    Yields:
        Read frames; the iterator terminates when the capture card stream
            times out.
    """
    def drop_frames(q):
        dropped = 0
        latest = q.get(block=True)

        while True:
            try:
                latest = q.get_nowait()
                dropped += 1
            except Empty:
                return latest, dropped

    q = self.qstream(numpy=numpy)
    while True:
        frame, dropped = drop_frames(q)
        if dropped > 0:
            self.log.warning(f"Dropped {dropped} frames.")
        if frame is None:
            break
        else:
            yield frame

qstream

qstream(numpy: Literal[True]) -> Queue[ndarray | None]
qstream(numpy: Literal[False] = False) -> Queue[RadarFrame | None]
qstream(
    numpy: bool = False,
) -> Queue[RadarFrame | None] | Queue[ndarray | None]

Read into a queue from a threaded worker.

The threaded worker is run with daemon=True. Like stream, .qstream() also relies on another worker to trigger stop.

Note

If a TimeoutError is received (e.g. after .stop()), the error is caught, and the stream is halted.

Parameters:

Name Type Description Default
numpy bool

yield a numpy array instead of a RadarFrame.

False

Returns:

Type Description
Queue[RadarFrame | None] | Queue[ndarray | None]

A queue of RadarFrame (or np.ndarray) read by the capture card. When the stream terminates, None is written to the queue.

Source code in src/xwr/system.py
def qstream(
    self, numpy: bool = False
) -> Queue[types.RadarFrame | None] | Queue[np.ndarray | None]:
    """Read into a queue from a threaded worker.

    The threaded worker is run with `daemon=True`. Like [`stream`][..],
    `.qstream()` also relies on another worker to trigger [`stop`][..].

    !!! note

        If a `TimeoutError` is received (e.g. after `.stop()`), the
        error is caught, and the stream is halted.

    Args:
        numpy: yield a numpy array instead of a `RadarFrame`.

    Returns:
        A queue of `RadarFrame` (or np.ndarray) read by the capture card.
            When the stream terminates, `None` is written to the queue.
    """
    out: Queue[types.RadarFrame | None] | Queue[np.ndarray | None] = Queue()

    def worker():
        try:
            for frame in self.stream():
                if numpy:
                    if frame is not None:
                        frame = np.frombuffer(
                            frame.data, dtype=np.int16
                        ).reshape(*self.config.raw_shape)
                    # Type inference can't figure out this overload check
                    cast(Queue[np.ndarray | None], out).put(frame)
                else:
                    out.put(frame)
        except TimeoutError:
            pass
        out.put(None)

    threading.Thread(target=worker, daemon=True).start()
    return out

stop

stop() -> None

Stop by halting the capture card and reboot the radar.

In testing, we found that the radar may ignore commands if the frame timings are too tight, which prevents a soft reset. We simply reboot the radar via the capture card instead.

Warning

If you fail to .stop() the system before exiting, the radar may become non-responsive, and require a power cycle.

Source code in src/xwr/system.py
def stop(self) -> None:
    """Stop by halting the capture card and reboot the radar.

    In testing, we found that the radar may ignore commands if the frame
    timings are too tight, which prevents a soft reset. We simply reboot
    the radar via the capture card instead.

    !!! warning

        If you fail to `.stop()` the system before exiting, the radar may
        become non-responsive, and require a power cycle.
    """
    self.dca.stop()
    self.dca.reset_ar_device()

stream

stream() -> Iterator[RadarFrame]

Iterator which yields successive frames.

Note

.stream() does not internally terminate data collection; another worker must call stop.

Yields:

Type Description
RadarFrame

Read frames; the iterator terminates when the capture card stream times out.

Source code in src/xwr/system.py
def stream(self) -> Iterator[types.RadarFrame]:
    """Iterator which yields successive frames.

    !!! note

        `.stream()` does not internally terminate data collection;
        another worker must call [`stop`][..].

    Yields:
        Read frames; the iterator terminates when the capture card stream
            times out.
    """
    # send a "stop" command in case the capture card is still running
    self.dca.stop()
    # reboot radar in case it is stuck
    self.dca.reset_ar_device()
    # clear buffer from possible previous data collection
    # (will mess up byte count indices if we don't)
    self.dca.flush()

    # start capture card & radar
    self.dca.start()
    self.xwr.setup(**self.config.as_dict())
    self.xwr.start()

    return self.dca.stream(self.config.raw_shape)

xwr.DCAConfig dataclass

DCA1000EVM Capture card configuration.

Attributes:

Name Type Description
sys_ip str

system IP; should be manually configured with a subnet mask of 255.255.255.0.

fpga_ip str

FPGA IP address; either hard-coded or configured.

data_port int

data network port number.

config_port int

configuration network port number.

timeout float

Socket read timeout, in seconds.

socket_buffer int

Network read buffer size; should be less than rmem_max.

delay float

Packet delay for the capture card, in microseconds.

Source code in src/xwr/config.py
@dataclass
class DCAConfig:
    """DCA1000EVM Capture card configuration.

    Attributes:
        sys_ip: system IP; should be manually configured with a subnet mask of
            `255.255.255.0`.
        fpga_ip: FPGA IP address; either hard-coded or configured.
        data_port: data network port number.
        config_port: configuration network port number.
        timeout: Socket read timeout, in seconds.
        socket_buffer: Network read buffer size; should be less than
            [`rmem_max`](https://www.kernel.org/doc/html/latest/admin-guide/sysctl/net.html#rmem-max).
        delay: Packet delay for the capture card, in microseconds.
    """

    sys_ip: str = "192.168.33.30"
    fpga_ip: str = "192.168.33.180"
    data_port: int = 4098
    config_port: int = 4096
    timeout: float = 1.0
    socket_buffer: int = 6291456
    delay: float = 5.0

    @property
    def throughput(self) -> float:
        """Theoretical maximum data rate, in bits/sec."""
        packet_time = (
            defines.DCAConstants.DCA_PACKET_SIZE
            * 8 / defines.DCAConstants.DCA_BITRATE + self.delay / 1e6)
        return 1 / packet_time * defines.DCAConstants.DCA_PACKET_SIZE * 8

    def create(self) -> DCA1000EVM:
        """Initialize and setup capture card from this configuration."""
        dca = DCA1000EVM(
            sys_ip=self.sys_ip, fpga_ip=self.fpga_ip,
            data_port=self.data_port, config_port=self.config_port,
            timeout=self.timeout, socket_buffer=self.socket_buffer)
        dca.setup(delay=self.delay)
        return dca

throughput property

throughput: float

Theoretical maximum data rate, in bits/sec.

create

create() -> DCA1000EVM

Initialize and setup capture card from this configuration.

Source code in src/xwr/config.py
def create(self) -> DCA1000EVM:
    """Initialize and setup capture card from this configuration."""
    dca = DCA1000EVM(
        sys_ip=self.sys_ip, fpga_ip=self.fpga_ip,
        data_port=self.data_port, config_port=self.config_port,
        timeout=self.timeout, socket_buffer=self.socket_buffer)
    dca.setup(delay=self.delay)
    return dca

xwr.XWRConfig dataclass

Radar configuration.

The TI mmWave sensing estimator may be helpful for creating a configuration.

Attributes:

Name Type Description
device type[XWRBase] | str

radar device type, or the name of a radar device class in xwr.radar.

frequency float

base frequency, in GHz.

idle_time float

radar timing parameters; in microseconds.

adc_start_time float

radar timing parameters; in microseconds.

ramp_end_time float

radar timing parameters; in microseconds.

tx_start_time float

radar timing parameters; in microseconds.

freq_slope float

chirp slope, in MHz/us.

adc_samples int

number of samples per chirp. Must be a power of two.

sample_rate int

ADC sampling rate, in KHz.

frame_length int

number of chirps per TX antenna per frame. Must be a power of two.

frame_period float

periodicity of frames, in ms.

port str | None

Control serial port (usually /dev/ttyACM0). Use None to auto-detect; see XWRBase.

Source code in src/xwr/config.py
@dataclass
class XWRConfig:
    """Radar configuration.

    The [TI mmWave sensing estimator](
    https://dev.ti.com/gallery/view/mmwave/mmWaveSensingEstimator/ver/2.4.0/)
    may be helpful for creating a configuration.

    Attributes:
        device: radar device type, or the name of a radar device class in
            [`xwr.radar`][xwr.radar].
        frequency: base frequency, in GHz.
        idle_time: radar timing parameters; in microseconds.
        adc_start_time: radar timing parameters; in microseconds.
        ramp_end_time: radar timing parameters; in microseconds.
        tx_start_time: radar timing parameters; in microseconds.
        freq_slope: chirp slope, in MHz/us.
        adc_samples: number of samples per chirp. Must be a power of two.
        sample_rate: ADC sampling rate, in KHz.
        frame_length: number of chirps per TX antenna per frame. Must be a
            power of two.
        frame_period: periodicity of frames, in ms.
        port: Control serial port (usually `/dev/ttyACM0`). Use `None` to
            auto-detect; see [`XWRBase`][xwr.radar.XWRBase].
    """

    device: type[radar.XWRBase] | str
    frequency: float
    idle_time: float
    adc_start_time: float
    ramp_end_time: float
    tx_start_time: float
    freq_slope: float
    adc_samples: int
    sample_rate: int
    frame_length: int
    frame_period: float
    port: str | None = None

    @property
    def device_type(self) -> type[radar.XWRBase]:
        """Radar device type."""
        if isinstance(self.device, str):
            try:
                return getattr(radar, self.device)
            except AttributeError:
                raise ValueError(f"Unknown radar device: {self.device}")
        else:
            return self.device

    @property
    def num_tx(self) -> int:
        """Number of TX antennas."""
        return self.device_type.NUM_TX

    @property
    def num_rx(self) -> int:
        """Number of RX antennas."""
        return self.device_type.NUM_RX
    @property
    def shape(self) -> tuple[int, int, int, int]:
        """Radar data cube shape."""
        return (
            self.frame_length, self.num_tx, self.num_rx, self.adc_samples)

    @property
    def raw_shape(self) -> tuple[int, int, int, int]:
        """Radar IIQQ data shape."""
        return (
            self.frame_length, self.num_tx, self.num_rx, self.adc_samples * 2)

    @property
    def frame_size(self) -> int:
        """Radar data cube size, in bytes."""
        return (self.frame_length * self.num_tx * self.num_rx *
                self.adc_samples * 2 * 2)

    @property
    def chirp_time(self) -> float:
        """Per-TX antenna inter-chirp time T_c, in microseconds."""
        return (self.idle_time + self.ramp_end_time) * self.num_tx

    @property
    def frame_time(self) -> float:
        """Total radar frame time, in ms."""
        return self.chirp_time * self.frame_length / 1e3

    @property
    def sample_time(self) -> float:
        """Total sampling time T_s, in us."""
        return self.adc_samples / self.sample_rate * 1e3

    @property
    def bandwidth(self) -> float:
        """Effective bandwidth, in MHz."""
        return self.freq_slope * self.sample_time

    @property
    def range_resolution(self) -> float:
        """Range resolution, in m."""
        return SPEED_OF_LIGHT / (2 * self.bandwidth * 1e6)

    @property
    def max_range(self) -> float:
        """Maximum range, in m."""
        return self.range_resolution * self.adc_samples

    @property
    def wavelength(self) -> float:
        """Center wavelength, in m."""
        offset_time = self.adc_start_time + self.sample_time / 2
        return SPEED_OF_LIGHT / (
            self.frequency * 1e9 + self.freq_slope * (offset_time) * 1e6)

    @property
    def doppler_resolution(self) -> float:
        """Doppler resolution, in m/s."""
        return (
            self.wavelength / (2 * self.frame_length * self.chirp_time * 1e-6))

    @property
    def max_doppler(self) -> float:
        """Maximum doppler velocity, in m/s."""
        return self.wavelength / (4 * self.chirp_time * 1e-6)

    @property
    def throughput(self) -> float:
        """Average throughput, in bits/sec."""
        return self.frame_size * 8 / self.frame_period * 1e3

    def as_dict(self) -> dict[str, float | int]:
        """Export as dictionary."""
        RADAR_PROPERTIES = [
            "frequency", "idle_time", "adc_start_time", "ramp_end_time",
            "tx_start_time", "freq_slope", "adc_samples", "sample_rate",
            "frame_length", "frame_period"]
        return {k: getattr(self, k) for k in RADAR_PROPERTIES}

    def as_intrinsics(self) -> dict:
        """Export as intrinsics dictionary."""
        RADAR_INTRINSICS = [
            "shape", "range_resolution", "doppler_resolution"]
        return {k: getattr(self, k) for k in RADAR_INTRINSICS}

bandwidth property

bandwidth: float

Effective bandwidth, in MHz.

chirp_time property

chirp_time: float

Per-TX antenna inter-chirp time T_c, in microseconds.

device_type property

device_type: type[XWRBase]

Radar device type.

doppler_resolution property

doppler_resolution: float

Doppler resolution, in m/s.

frame_size property

frame_size: int

Radar data cube size, in bytes.

frame_time property

frame_time: float

Total radar frame time, in ms.

max_doppler property

max_doppler: float

Maximum doppler velocity, in m/s.

max_range property

max_range: float

Maximum range, in m.

num_rx property

num_rx: int

Number of RX antennas.

num_tx property

num_tx: int

Number of TX antennas.

range_resolution property

range_resolution: float

Range resolution, in m.

raw_shape property

raw_shape: tuple[int, int, int, int]

Radar IIQQ data shape.

sample_time property

sample_time: float

Total sampling time T_s, in us.

shape property

shape: tuple[int, int, int, int]

Radar data cube shape.

throughput property

throughput: float

Average throughput, in bits/sec.

wavelength property

wavelength: float

Center wavelength, in m.

as_dict

as_dict() -> dict[str, float | int]

Export as dictionary.

Source code in src/xwr/config.py
def as_dict(self) -> dict[str, float | int]:
    """Export as dictionary."""
    RADAR_PROPERTIES = [
        "frequency", "idle_time", "adc_start_time", "ramp_end_time",
        "tx_start_time", "freq_slope", "adc_samples", "sample_rate",
        "frame_length", "frame_period"]
    return {k: getattr(self, k) for k in RADAR_PROPERTIES}

as_intrinsics

as_intrinsics() -> dict

Export as intrinsics dictionary.

Source code in src/xwr/config.py
def as_intrinsics(self) -> dict:
    """Export as intrinsics dictionary."""
    RADAR_INTRINSICS = [
        "shape", "range_resolution", "doppler_resolution"]
    return {k: getattr(self, k) for k in RADAR_INTRINSICS}