Skip to content

xwr

TI mmWave Radar + DCA1000EVM Capture Card Raw Data Capture API.

Usage

To use the high-level API, create a XWRConfig and DCAConfig; then pass these to the XWRSystem. Use stream or qstream to automatically configure, start, and stream spectrum data from the radar.

xwr.XWRSystem

Bases: Generic[TRadar]

Radar capture system with a mmWave Radar and DCA1000EVM.

This high-level API provides three different ways to read frames, each with different performance characteristics:

  • stream: a simple iterator which yields successive frames. No buffering is done, which may lead to dropped packets if the consumer is too slow.
  • qstream: a queue-based interface, where a separate thread reads frames into a queue. This prevents dropped packets, but may lead to unbounded latency if the consumer is too slow.
  • dstream: a "drop frame" iterator, which yields the most recent frame, dropping any frames received while the consumer is processing a frame.

These correspond to the following use cases:

Method Use case Performance characteristics
stream Debugging, user-managed real-time systems No buffering
qstream Data collection No dropped packets, but unbounded latency
dstream Real-time demos No dropped packets, but may drop frames

Known Constraints

The XWRSystem will check for known constraints on initialization, and warn or raise if any are violated. See xwr.constraints for the full list.

Type Parameters
  • TRadar: radar type (subclass of XWRBase)

Parameters:

Name Type Description Default
radar XWRConfig | dict

radar configuration; if dict, the key/value pairs are passed to XWRConfig.

required
capture DCAConfig | dict

capture card configuration; if dict, the key/value pairs are passed to DCAConfig.

required
name str

friendly name for logging; can be default.

'RadarCapture'
strict bool

if True, raise an error instead of logging a warning if the radar configuration contains potentially invalid values.

False
Source code in src/xwr/system.py
class XWRSystem(Generic[TRadar]):
    """Radar capture system with a mmWave Radar and DCA1000EVM.

    This high-level API provides three different ways to read frames, each
    with different performance characteristics:

    - [`stream`][.]: a simple iterator which yields successive frames. No
        buffering is done, which may lead to dropped packets if the consumer
        is too slow.
    - [`qstream`][.]: a queue-based interface, where a separate thread reads
        frames into a queue. This prevents dropped packets, but may lead to
        unbounded latency if the consumer is too slow.
    - [`dstream`][.]: a "drop frame" iterator, which yields the most recent
        frame, dropping any frames received while the consumer is processing a
        frame.

    These correspond to the following use cases:

    | Method | Use case | Performance characteristics |
    |---|---|---|
    | [`stream`][.] | Debugging, user-managed real-time systems | No buffering |
    | [`qstream`][.] | Data collection | No dropped packets, but unbounded latency |
    | [`dstream`][.] | Real-time demos | No dropped packets, but may drop frames |

    !!! info "Known Constraints"

        The `XWRSystem` will check for known constraints on initialization,
        and warn or raise if any are violated. See
        [`xwr.constraints`][xwr.constraints] for the full list.

    Type Parameters:
        - `TRadar`: radar type (subclass of [`XWRBase`][xwr.radar.])

    Args:
        radar: radar configuration; if `dict`, the key/value pairs are passed
            to `XWRConfig`.
        capture: capture card configuration; if `dict`, the key/value pairs are
            passed to `DCAConfig`.
        name: friendly name for logging; can be default.
        strict: if `True`, raise an error instead of logging a warning if the
            radar configuration contains potentially invalid values.
    """

    def __init__(
        self, *, radar: XWRConfig | dict, capture: DCAConfig | dict,
        name: str = "RadarCapture", strict: bool = False
    ) -> None:
        if isinstance(radar, dict):
            radar = XWRConfig(**radar)
        if isinstance(capture, dict):
            capture = DCAConfig(**capture)

        self.log: logging.Logger = logging.getLogger(name)
        self.strict = strict
        self._check_config(radar, capture)

        self.dca: DCA1000EVM = capture.create()
        self.xwr: TRadar = cast(
            type[TRadar], radar.device_type)(port=radar.port)

        self.config = radar
        self.fps: float = 1000.0 / radar.frame_period

    def _check_config(self, radar: XWRConfig, capture: DCAConfig) -> None:
        """Check config, and raise if strict mode is enabled and invalid."""
        for r in check_config(radar, capture):
            if r.passed is False and self.strict:
                raise ValueError(
                    f"Invalid configuration | {r.constraint.__name__}: {r.detail}")

    def stream(self) -> Iterator[types.RadarFrame]:
        """Iterator which yields successive frames.

        !!! note

            `.stream()` does not internally terminate data collection;
            another worker must call [`stop`][..].

        Yields:
            Read frames; the iterator terminates when the capture card stream
                times out.
        """
        # send a "stop" command in case the capture card is still running
        self.dca.stop()
        # reboot radar in case it is stuck
        self.dca.reset_ar_device()
        # clear buffer from possible previous data collection
        # (will mess up byte count indices if we don't)
        self.dca.flush()

        # start capture card & radar
        self.dca.start()
        self.xwr.setup(**self.config.as_dict())
        self.xwr.start()

        return self.dca.stream(self.config.raw_shape)

    @overload
    def qstream(self, numpy: Literal[True]) -> Queue[np.ndarray | None]: ...

    @overload
    def qstream(
        self, numpy: Literal[False] = False
    ) -> Queue[types.RadarFrame | None]: ...

    def qstream(
        self, numpy: bool = False
    ) -> Queue[types.RadarFrame | None] | Queue[np.ndarray | None]:
        """Read into a queue from a threaded worker.

        The threaded worker is run with `daemon=True`. Like [`stream`][..],
        `.qstream()` also relies on another worker to trigger [`stop`][..].

        !!! note

            If a `TimeoutError` is received (e.g. after `.stop()`), the
            error is caught, and the stream is halted.

        Args:
            numpy: yield a numpy array instead of a `RadarFrame`.

        Returns:
            A queue of `RadarFrame` (or np.ndarray) read by the capture card.
                When the stream terminates, `None` is written to the queue.
        """
        out: Queue[types.RadarFrame | None] | Queue[np.ndarray | None] = Queue()

        def worker():
            try:
                for frame in self.stream():
                    if numpy:
                        if frame is not None:
                            frame = np.frombuffer(
                                frame.data, dtype=np.int16
                            ).reshape(*self.config.raw_shape)
                        # Type inference can't figure out this overload check
                        cast(Queue[np.ndarray | None], out).put(frame)
                    else:
                        out.put(frame)
            except TimeoutError:
                pass
            out.put(None)

        threading.Thread(target=worker, daemon=True).start()
        return out

    @overload
    def dstream(self, numpy: Literal[True]) -> Iterator[np.ndarray]: ...

    @overload
    def dstream(self, numpy: Literal[False]) -> Iterator[types.RadarFrame]: ...

    def dstream(
        self, numpy: bool = False
    ) -> Iterator[types.RadarFrame | np.ndarray]:
        """Stream frames, dropping any frames if the consumer gets behind.

        Args:
            numpy: yield a numpy array instead of a `RadarFrame`.

        Yields:
            Read frames; the iterator terminates when the capture card stream
                times out.
        """
        def drop_frames(q):
            dropped = 0
            latest = q.get(block=True)

            while True:
                try:
                    latest = q.get_nowait()
                    dropped += 1
                except Empty:
                    return latest, dropped

        q = self.qstream(numpy=numpy)
        while True:
            frame, dropped = drop_frames(q)
            if dropped > 0:
                self.log.warning(f"Dropped {dropped} frames.")
            if frame is None:
                break
            else:
                yield frame

    def stop(self) -> None:
        """Stop by halting the capture card and reboot the radar.

        In testing, we found that the radar may ignore commands if the frame
        timings are too tight, which prevents a soft reset. We simply reboot
        the radar via the capture card instead.

        !!! warning

            If you fail to `.stop()` the system before exiting, the radar may
            become non-responsive, and require a power cycle.
        """
        self.dca.stop()
        self.dca.reset_ar_device()

dstream

dstream(numpy: Literal[True]) -> Iterator[ndarray]
dstream(numpy: Literal[False]) -> Iterator[RadarFrame]
dstream(numpy: bool = False) -> Iterator[RadarFrame | ndarray]

Stream frames, dropping any frames if the consumer gets behind.

Parameters:

Name Type Description Default
numpy bool

yield a numpy array instead of a RadarFrame.

False

Yields:

Type Description
RadarFrame | ndarray

Read frames; the iterator terminates when the capture card stream times out.

Source code in src/xwr/system.py
def dstream(
    self, numpy: bool = False
) -> Iterator[types.RadarFrame | np.ndarray]:
    """Stream frames, dropping any frames if the consumer gets behind.

    Args:
        numpy: yield a numpy array instead of a `RadarFrame`.

    Yields:
        Read frames; the iterator terminates when the capture card stream
            times out.
    """
    def drop_frames(q):
        dropped = 0
        latest = q.get(block=True)

        while True:
            try:
                latest = q.get_nowait()
                dropped += 1
            except Empty:
                return latest, dropped

    q = self.qstream(numpy=numpy)
    while True:
        frame, dropped = drop_frames(q)
        if dropped > 0:
            self.log.warning(f"Dropped {dropped} frames.")
        if frame is None:
            break
        else:
            yield frame

qstream

qstream(numpy: Literal[True]) -> Queue[ndarray | None]
qstream(numpy: Literal[False] = False) -> Queue[RadarFrame | None]
qstream(
    numpy: bool = False,
) -> Queue[RadarFrame | None] | Queue[ndarray | None]

Read into a queue from a threaded worker.

The threaded worker is run with daemon=True. Like stream, .qstream() also relies on another worker to trigger stop.

Note

If a TimeoutError is received (e.g. after .stop()), the error is caught, and the stream is halted.

Parameters:

Name Type Description Default
numpy bool

yield a numpy array instead of a RadarFrame.

False

Returns:

Type Description
Queue[RadarFrame | None] | Queue[ndarray | None]

A queue of RadarFrame (or np.ndarray) read by the capture card. When the stream terminates, None is written to the queue.

Source code in src/xwr/system.py
def qstream(
    self, numpy: bool = False
) -> Queue[types.RadarFrame | None] | Queue[np.ndarray | None]:
    """Read into a queue from a threaded worker.

    The threaded worker is run with `daemon=True`. Like [`stream`][..],
    `.qstream()` also relies on another worker to trigger [`stop`][..].

    !!! note

        If a `TimeoutError` is received (e.g. after `.stop()`), the
        error is caught, and the stream is halted.

    Args:
        numpy: yield a numpy array instead of a `RadarFrame`.

    Returns:
        A queue of `RadarFrame` (or np.ndarray) read by the capture card.
            When the stream terminates, `None` is written to the queue.
    """
    out: Queue[types.RadarFrame | None] | Queue[np.ndarray | None] = Queue()

    def worker():
        try:
            for frame in self.stream():
                if numpy:
                    if frame is not None:
                        frame = np.frombuffer(
                            frame.data, dtype=np.int16
                        ).reshape(*self.config.raw_shape)
                    # Type inference can't figure out this overload check
                    cast(Queue[np.ndarray | None], out).put(frame)
                else:
                    out.put(frame)
        except TimeoutError:
            pass
        out.put(None)

    threading.Thread(target=worker, daemon=True).start()
    return out

stop

stop() -> None

Stop by halting the capture card and reboot the radar.

In testing, we found that the radar may ignore commands if the frame timings are too tight, which prevents a soft reset. We simply reboot the radar via the capture card instead.

Warning

If you fail to .stop() the system before exiting, the radar may become non-responsive, and require a power cycle.

Source code in src/xwr/system.py
def stop(self) -> None:
    """Stop by halting the capture card and reboot the radar.

    In testing, we found that the radar may ignore commands if the frame
    timings are too tight, which prevents a soft reset. We simply reboot
    the radar via the capture card instead.

    !!! warning

        If you fail to `.stop()` the system before exiting, the radar may
        become non-responsive, and require a power cycle.
    """
    self.dca.stop()
    self.dca.reset_ar_device()

stream

stream() -> Iterator[RadarFrame]

Iterator which yields successive frames.

Note

.stream() does not internally terminate data collection; another worker must call stop.

Yields:

Type Description
RadarFrame

Read frames; the iterator terminates when the capture card stream times out.

Source code in src/xwr/system.py
def stream(self) -> Iterator[types.RadarFrame]:
    """Iterator which yields successive frames.

    !!! note

        `.stream()` does not internally terminate data collection;
        another worker must call [`stop`][..].

    Yields:
        Read frames; the iterator terminates when the capture card stream
            times out.
    """
    # send a "stop" command in case the capture card is still running
    self.dca.stop()
    # reboot radar in case it is stuck
    self.dca.reset_ar_device()
    # clear buffer from possible previous data collection
    # (will mess up byte count indices if we don't)
    self.dca.flush()

    # start capture card & radar
    self.dca.start()
    self.xwr.setup(**self.config.as_dict())
    self.xwr.start()

    return self.dca.stream(self.config.raw_shape)

xwr.DCAConfig dataclass

DCA1000EVM Capture card configuration.

Attributes:

Name Type Description
sys_ip str

system IP; should be manually configured with a subnet mask of 255.255.255.0.

fpga_ip str

FPGA IP address; either hard-coded or configured.

data_port int

data network port number.

config_port int

configuration network port number.

timeout float

Socket read timeout, in seconds.

socket_buffer int

Network read buffer size; should be less than rmem_max.

delay float

Packet delay for the capture card, in microseconds.

Source code in src/xwr/config.py
@dataclass
class DCAConfig:
    """DCA1000EVM Capture card configuration.

    Attributes:
        sys_ip: system IP; should be manually configured with a subnet mask of
            `255.255.255.0`.
        fpga_ip: FPGA IP address; either hard-coded or configured.
        data_port: data network port number.
        config_port: configuration network port number.
        timeout: Socket read timeout, in seconds.
        socket_buffer: Network read buffer size; should be less than
            [`rmem_max`](https://www.kernel.org/doc/html/latest/admin-guide/sysctl/net.html#rmem-max).
        delay: Packet delay for the capture card, in microseconds.
    """

    sys_ip: str = "192.168.33.30"
    fpga_ip: str = "192.168.33.180"
    data_port: int = 4098
    config_port: int = 4096
    timeout: float = 1.0
    socket_buffer: int = 6291456
    delay: float = 5.0

    @property
    def throughput(self) -> float:
        """Theoretical maximum data rate, in bits/sec."""
        packet_time = (
            defines.DCAConstants.DCA_PACKET_SIZE
            * 8 / defines.DCAConstants.DCA_BITRATE + self.delay / 1e6)
        return 1 / packet_time * defines.DCAConstants.DCA_PACKET_SIZE * 8

    def create(self) -> DCA1000EVM:
        """Initialize and setup capture card from this configuration."""
        dca = DCA1000EVM(
            sys_ip=self.sys_ip, fpga_ip=self.fpga_ip,
            data_port=self.data_port, config_port=self.config_port,
            timeout=self.timeout, socket_buffer=self.socket_buffer)
        dca.setup(delay=self.delay)
        return dca

throughput property

throughput: float

Theoretical maximum data rate, in bits/sec.

create

create() -> DCA1000EVM

Initialize and setup capture card from this configuration.

Source code in src/xwr/config.py
def create(self) -> DCA1000EVM:
    """Initialize and setup capture card from this configuration."""
    dca = DCA1000EVM(
        sys_ip=self.sys_ip, fpga_ip=self.fpga_ip,
        data_port=self.data_port, config_port=self.config_port,
        timeout=self.timeout, socket_buffer=self.socket_buffer)
    dca.setup(delay=self.delay)
    return dca

xwr.XWRConfig dataclass

Radar configuration.

The TI mmWave sensing estimator may be helpful for creating a configuration.

Info

Attributes marked with (derived) are computed from other attributes, i.e., are @property attributes.

Attributes:

Name Type Description
device type[XWRBase] | str

radar device type, or the name of a radar device class in xwr.radar.

frequency float

base frequency, in GHz.

idle_time float

radar timing parameters; in microseconds.

adc_start_time float

radar timing parameters; in microseconds.

ramp_end_time float

radar timing parameters; in microseconds.

tx_start_time float

radar timing parameters; in microseconds.

freq_slope float

chirp slope, in MHz/us.

adc_samples int

number of samples per chirp. Must be a power of two.

sample_rate int

ADC sampling rate, in KHz.

frame_length int

number of chirps per TX antenna per frame. Must be a power of two.

frame_period float

periodicity of frames, in ms.

port str | None

Control serial port (usually /dev/ttyACM0). Use None to auto-detect; see XWRBase.

device_name str

(derived) Device class name; used as constraint lookup key.

device_type type[XWRBase]

(derived) Radar device type.

num_tx int

(derived) Number of TX antennas.

num_rx int

(derived) Number of RX antennas.

shape tuple[int, int, int, int]

(derived) Radar data cube shape.

raw_shape tuple[int, int, int, int]

(derived) Radar IIQQ data shape.

frame_size int

(derived) Radar data cube size, in bytes.

chirp_time float

(derived) Per-TX antenna inter-chirp time T_c, in microseconds.

frame_time float

(derived) Total radar frame time, in ms.

sample_time float

(derived) Total sampling time T_s, in us.

bandwidth float

(derived) Effective bandwidth, in MHz.

range_resolution float

(derived) Range resolution, in m.

max_range float

(derived) Maximum range, in m.

wavelength float

(derived) Center wavelength, in m.

doppler_resolution float

(derived) Doppler resolution, in m/s.

max_doppler float

(derived) Maximum doppler velocity, in m/s.

throughput float

(derived) Average throughput, in bits/sec.

Source code in src/xwr/config.py
@dataclass
class XWRConfig:
    """Radar configuration.

    The [TI mmWave sensing estimator](
    https://dev.ti.com/gallery/view/mmwave/mmWaveSensingEstimator/ver/2.4.0/)
    may be helpful for creating a configuration.

    !!! info

        Attributes marked with *(derived)* are computed from other attributes,
        i.e., are `@property` attributes.

    Attributes:
        device: radar device type, or the name of a radar device class in
            [`xwr.radar`][xwr.radar].
        frequency: base frequency, in GHz.
        idle_time: radar timing parameters; in microseconds.
        adc_start_time: radar timing parameters; in microseconds.
        ramp_end_time: radar timing parameters; in microseconds.
        tx_start_time: radar timing parameters; in microseconds.
        freq_slope: chirp slope, in MHz/us.
        adc_samples: number of samples per chirp. Must be a power of two.
        sample_rate: ADC sampling rate, in KHz.
        frame_length: number of chirps per TX antenna per frame. Must be a
            power of two.
        frame_period: periodicity of frames, in ms.
        port: Control serial port (usually `/dev/ttyACM0`). Use `None` to
            auto-detect; see [`XWRBase`][xwr.radar.XWRBase].
        device_name: *(derived)* Device class name; used as constraint lookup key.
        device_type: *(derived)* Radar device type.
        num_tx: *(derived)* Number of TX antennas.
        num_rx: *(derived)* Number of RX antennas.
        shape: *(derived)* Radar data cube shape.
        raw_shape: *(derived)* Radar IIQQ data shape.
        frame_size: *(derived)* Radar data cube size, in bytes.
        chirp_time: *(derived)* Per-TX antenna inter-chirp time T_c, in microseconds.
        frame_time: *(derived)* Total radar frame time, in ms.
        sample_time: *(derived)* Total sampling time T_s, in us.
        bandwidth: *(derived)* Effective bandwidth, in MHz.
        range_resolution: *(derived)* Range resolution, in m.
        max_range: *(derived)* Maximum range, in m.
        wavelength: *(derived)* Center wavelength, in m.
        doppler_resolution: *(derived)* Doppler resolution, in m/s.
        max_doppler: *(derived)* Maximum doppler velocity, in m/s.
        throughput: *(derived)* Average throughput, in bits/sec.
    """

    device: type[radar.XWRBase] | str
    frequency: float
    idle_time: float
    adc_start_time: float
    ramp_end_time: float
    tx_start_time: float
    freq_slope: float
    adc_samples: int
    sample_rate: int
    frame_length: int
    frame_period: float
    port: str | None = None

    @property
    def device_name(self) -> str:
        return self.device_type.__name__

    @property
    def device_type(self) -> type[radar.XWRBase]:
        if isinstance(self.device, str):
            try:
                return getattr(radar, self.device)
            except AttributeError:
                raise ValueError(f"Unknown radar device: {self.device}")
        else:
            return self.device

    @property
    def num_tx(self) -> int:
        return self.device_type.NUM_TX

    @property
    def num_rx(self) -> int:
        return self.device_type.NUM_RX

    @property
    def shape(self) -> tuple[int, int, int, int]:
        return (
            self.frame_length, self.num_tx, self.num_rx, self.adc_samples)

    @property
    def raw_shape(self) -> tuple[int, int, int, int]:
        return (
            self.frame_length, self.num_tx, self.num_rx, self.adc_samples
            * self.device_type.BYTES_PER_SAMPLE // 2)

    @property
    def frame_size(self) -> int:
        return (self.frame_length * self.num_tx * self.num_rx *
                self.adc_samples * self.device_type.BYTES_PER_SAMPLE)

    @property
    def chirp_time(self) -> float:
        return (self.idle_time + self.ramp_end_time) * self.num_tx

    @property
    def frame_time(self) -> float:
        return self.chirp_time * self.frame_length / 1e3

    @property
    def sample_time(self) -> float:
        return self.adc_samples / self.sample_rate * 1e3

    @property
    def bandwidth(self) -> float:
        return self.freq_slope * self.sample_time

    @property
    def range_resolution(self) -> float:
        return SPEED_OF_LIGHT / (2 * self.bandwidth * 1e6)

    @property
    def max_range(self) -> float:
        return self.range_resolution * self.adc_samples

    @property
    def wavelength(self) -> float:
        offset_time = self.adc_start_time + self.sample_time / 2
        return SPEED_OF_LIGHT / (
            self.frequency * 1e9 + self.freq_slope * (offset_time) * 1e6)

    @property
    def doppler_resolution(self) -> float:
        return (
            self.wavelength / (2 * self.frame_length * self.chirp_time * 1e-6))

    @property
    def max_doppler(self) -> float:
        return self.wavelength / (4 * self.chirp_time * 1e-6)

    @property
    def throughput(self) -> float:
        return self.frame_size * 8 / self.frame_period * 1e3

    def as_dict(self) -> dict[str, float | int]:
        """Export as dictionary."""
        RADAR_PROPERTIES = [
            "frequency", "idle_time", "adc_start_time", "ramp_end_time",
            "tx_start_time", "freq_slope", "adc_samples", "sample_rate",
            "frame_length", "frame_period"]
        return {k: getattr(self, k) for k in RADAR_PROPERTIES}

    def as_intrinsics(self) -> dict:
        """Export as intrinsics dictionary."""
        RADAR_INTRINSICS = [
            "shape", "range_resolution", "doppler_resolution"]
        return {k: getattr(self, k) for k in RADAR_INTRINSICS}

as_dict

as_dict() -> dict[str, float | int]

Export as dictionary.

Source code in src/xwr/config.py
def as_dict(self) -> dict[str, float | int]:
    """Export as dictionary."""
    RADAR_PROPERTIES = [
        "frequency", "idle_time", "adc_start_time", "ramp_end_time",
        "tx_start_time", "freq_slope", "adc_samples", "sample_rate",
        "frame_length", "frame_period"]
    return {k: getattr(self, k) for k in RADAR_PROPERTIES}

as_intrinsics

as_intrinsics() -> dict

Export as intrinsics dictionary.

Source code in src/xwr/config.py
def as_intrinsics(self) -> dict:
    """Export as intrinsics dictionary."""
    RADAR_INTRINSICS = [
        "shape", "range_resolution", "doppler_resolution"]
    return {k: getattr(self, k) for k in RADAR_INTRINSICS}