Skip to content

xwr.rsp

Radar Signal Processing for batched 4D spectrum.

Image Axis Order

Elevation and azimuth axes are in "image order": increasing index is down and to the right, respectively.

To use the RSP:

  1. Pick your backend. Currently, we support numpy, jax, and pytorch.

    Note

    Each RSP backend is not imported by default; you must explicitly import the backend you want to use, and make sure its dependencies are installed (pytorch, jax, etc).

    from xwr.rsp import numpy as rsp
    # or
    from xwr.rsp import jax as rsp
    

  2. Select the appropriate radar model.

  3. Import the RSP class matching your backend and radar:

    from xwr.rsp import RSP
    from xwr.rsp.torch import AWR1843AOP
    
    rsp: RSP = AWR1843AOP()
    

    Tip

    Use xwr.rsp.RSP as the type for a generic RSP, and RSP[np.ndarray], RSP[jax.Array], RSP[torch.Tensor], etc for a RSP with a specific backend.

xwr.rsp.RSP

Bases: ABC, Generic[TArray]

Abstract, backend-agnostic Radar Signal Processing base class.

Info

This class documents the public interface for all radar signal processing (RSP) classes, except where otherwise noted.

Type Parameters
  • TArray: Generic backend, e.g., np.ndarray, jax jax.Array, or torch Tensor.

Parameters:

Name Type Description Default
window bool | dict[Literal['range', 'doppler', 'azimuth', 'elevation'], bool]

whether to apply a hanning window. If bool, the same option is applied to all axes. If dict, specify per axis with keys "range", "doppler", "azimuth", and "elevation".

False
size dict[Literal['range', 'doppler', 'azimuth', 'elevation'], int]

target size for each axis after zero-padding, specified by axis. If an axis is not spacified, it is not padded.

{}
Source code in src/xwr/rsp/generic.py
class RSP(ABC, Generic[TArray]):
    """Abstract, backend-agnostic Radar Signal Processing base class.

    !!! info

        This class documents the public interface for all radar signal
        processing (RSP) classes, except where otherwise noted.

    Type Parameters:
        - `TArray`: Generic backend, e.g., `np.ndarray`, jax `jax.Array`, or
            torch `Tensor`.

    Args:
        window: whether to apply a hanning window. If `bool`, the same option
            is applied to all axes. If `dict`, specify per axis with keys
            "range", "doppler", "azimuth", and "elevation".
        size: target size for each axis after zero-padding, specified by axis.
            If an axis is not spacified, it is not padded.
    """

    def __init__(
        self, window: bool | dict[
            Literal["range", "doppler", "azimuth", "elevation"], bool] = False,
        size: dict[
            Literal["range", "doppler", "azimuth", "elevation"], int] = {}
    ) -> None:
        self.window: dict[
            Literal["range", "doppler", "azimuth", "elevation"], bool]
        self._default_window: bool | dict[
            Literal["range", "doppler", "azimuth", "elevation"], bool]

        if isinstance(window, bool):
            self.window = {}
            self._default_window = window
        else:
            self.window = window
            self._default_window = False

        self.size = size

    @abstractmethod
    def fft(
        self, array: Complex64[TArray, "..."],
        axes: tuple[int, ...],
        size: tuple[int, ...] | None = None,
        shift: tuple[int, ...] | None = None
    ) -> Complex64[TArray, "..."]:
        """Compute FFT on the specified axes of the array.

        Args:
            array: Input array.
            size: Target size for each axis after FFT (or `None` to use the
                input size).
            axes: Axes along which to compute the FFT.
            shift: Axes to shift after FFT, if any.

        Returns:
            FFT of the input array along the specified axes.
        """
        ...

    @staticmethod
    @abstractmethod
    def hann(
        iq: Complex64[TArray, "..."], axis: int
    ) -> Complex64[TArray, "..."]:
        """Apply a Hann window to the specified axis of the IQ data.

        Args:
            iq: IQ data.
            axis: Axis along which to apply the Hann window.

        Returns:
            IQ data with the Hann window applied along the specified axis.
        """
        ...

    def doppler_range(
        self, iq: Complex64[TArray, "#batch doppler tx rx range"]
    ) -> Complex64[TArray, "#batch doppler2 tx rx range2"]:
        """Calculate range-doppler spectrum from IQ data.

        Args:
            iq: IQ data.

        Returns:
            Computed range-doppler spectrum, with windowing if specified.
        """
        if self.window.get("range", self._default_window):
            iq = self.hann(iq, 4)
        if self.window.get("doppler", self._default_window):
            iq = self.hann(iq, 1)

        return self.fft(
            iq, axes=(1, 4), shift=(1,),
            size=(
                self.size.get("doppler", iq.shape[1]),
                self.size.get("range", iq.shape[4])))

    @abstractmethod
    def mimo_virtual_array(
        self, rd: Complex64[TArray, "#batch doppler tx rx range"]
    ) -> Complex64[TArray, "#batch doppler elevation azimuth range"]:
        """Set up MIMO virtual array from range-doppler spectrum.

        Args:
            rd: range-doppler spectrum.

        Returns:
            Computed MIMO virtual array, in elevation-azimuth order.
        """
        ...

    def elevation_azimuth(
        self, rd: Complex64[TArray, "#batch doppler tx rx range"]
    ) -> Complex64[TArray, "#batch doppler el az range"]:
        """Calculate elevation-azimuth spectrum from range-doppler spectrum.

        Args:
            rd: range-doppler spectrum.

        Returns:
            Computed elevation-azimuth spectrum, with windowing and padding if
                specified.
        """
        mimo = self.mimo_virtual_array(rd)

        if self.window.get("elevation", self._default_window):
            mimo = self.hann(mimo, 2)
        if self.window.get("azimuth", self._default_window):
            mimo = self.hann(mimo, 3)

        return self.fft(
            mimo, axes=(2, 3), shift=(2, 3),
            size=(
                self.size.get("elevation", mimo.shape[2]),
                self.size.get("azimuth", mimo.shape[3])))

    def __call__(
        self, iq: Complex64[TArray, "#batch doppler tx rx _range"]
            | Int16[TArray, "#batch doppler tx rx _range"]
    ) -> Complex64[TArray, "#batch doppler2 el az _range"]:
        """Process IQ data to compute elevation-azimuth spectrum.

        Args:
            iq: IQ data in complex or interleaved int16 IQ format.

        Returns:
            Computed doppler-elevation-azimuth-range spectrum.
        """
        uninterleaved = iq_from_iiqq(iq)
        dr = self.doppler_range(uninterleaved)
        drae = self.elevation_azimuth(dr)
        return drae

__call__

__call__(
    iq: Complex64[TArray, "#batch doppler tx rx _range"]
    | Int16[TArray, "#batch doppler tx rx _range"],
) -> Complex64[TArray, "#batch doppler2 el az _range"]

Process IQ data to compute elevation-azimuth spectrum.

Parameters:

Name Type Description Default
iq Complex64[TArray, '#batch doppler tx rx _range'] | Int16[TArray, '#batch doppler tx rx _range']

IQ data in complex or interleaved int16 IQ format.

required

Returns:

Type Description
Complex64[TArray, '#batch doppler2 el az _range']

Computed doppler-elevation-azimuth-range spectrum.

Source code in src/xwr/rsp/generic.py
def __call__(
    self, iq: Complex64[TArray, "#batch doppler tx rx _range"]
        | Int16[TArray, "#batch doppler tx rx _range"]
) -> Complex64[TArray, "#batch doppler2 el az _range"]:
    """Process IQ data to compute elevation-azimuth spectrum.

    Args:
        iq: IQ data in complex or interleaved int16 IQ format.

    Returns:
        Computed doppler-elevation-azimuth-range spectrum.
    """
    uninterleaved = iq_from_iiqq(iq)
    dr = self.doppler_range(uninterleaved)
    drae = self.elevation_azimuth(dr)
    return drae

doppler_range

doppler_range(
    iq: Complex64[TArray, "#batch doppler tx rx range"],
) -> Complex64[TArray, "#batch doppler2 tx rx range2"]

Calculate range-doppler spectrum from IQ data.

Parameters:

Name Type Description Default
iq Complex64[TArray, '#batch doppler tx rx range']

IQ data.

required

Returns:

Type Description
Complex64[TArray, '#batch doppler2 tx rx range2']

Computed range-doppler spectrum, with windowing if specified.

Source code in src/xwr/rsp/generic.py
def doppler_range(
    self, iq: Complex64[TArray, "#batch doppler tx rx range"]
) -> Complex64[TArray, "#batch doppler2 tx rx range2"]:
    """Calculate range-doppler spectrum from IQ data.

    Args:
        iq: IQ data.

    Returns:
        Computed range-doppler spectrum, with windowing if specified.
    """
    if self.window.get("range", self._default_window):
        iq = self.hann(iq, 4)
    if self.window.get("doppler", self._default_window):
        iq = self.hann(iq, 1)

    return self.fft(
        iq, axes=(1, 4), shift=(1,),
        size=(
            self.size.get("doppler", iq.shape[1]),
            self.size.get("range", iq.shape[4])))

elevation_azimuth

elevation_azimuth(
    rd: Complex64[TArray, "#batch doppler tx rx range"],
) -> Complex64[TArray, "#batch doppler el az range"]

Calculate elevation-azimuth spectrum from range-doppler spectrum.

Parameters:

Name Type Description Default
rd Complex64[TArray, '#batch doppler tx rx range']

range-doppler spectrum.

required

Returns:

Type Description
Complex64[TArray, '#batch doppler el az range']

Computed elevation-azimuth spectrum, with windowing and padding if specified.

Source code in src/xwr/rsp/generic.py
def elevation_azimuth(
    self, rd: Complex64[TArray, "#batch doppler tx rx range"]
) -> Complex64[TArray, "#batch doppler el az range"]:
    """Calculate elevation-azimuth spectrum from range-doppler spectrum.

    Args:
        rd: range-doppler spectrum.

    Returns:
        Computed elevation-azimuth spectrum, with windowing and padding if
            specified.
    """
    mimo = self.mimo_virtual_array(rd)

    if self.window.get("elevation", self._default_window):
        mimo = self.hann(mimo, 2)
    if self.window.get("azimuth", self._default_window):
        mimo = self.hann(mimo, 3)

    return self.fft(
        mimo, axes=(2, 3), shift=(2, 3),
        size=(
            self.size.get("elevation", mimo.shape[2]),
            self.size.get("azimuth", mimo.shape[3])))

fft abstractmethod

fft(
    array: Complex64[TArray, ...],
    axes: tuple[int, ...],
    size: tuple[int, ...] | None = None,
    shift: tuple[int, ...] | None = None,
) -> Complex64[TArray, ...]

Compute FFT on the specified axes of the array.

Parameters:

Name Type Description Default
array Complex64[TArray, ...]

Input array.

required
size tuple[int, ...] | None

Target size for each axis after FFT (or None to use the input size).

None
axes tuple[int, ...]

Axes along which to compute the FFT.

required
shift tuple[int, ...] | None

Axes to shift after FFT, if any.

None

Returns:

Type Description
Complex64[TArray, ...]

FFT of the input array along the specified axes.

Source code in src/xwr/rsp/generic.py
@abstractmethod
def fft(
    self, array: Complex64[TArray, "..."],
    axes: tuple[int, ...],
    size: tuple[int, ...] | None = None,
    shift: tuple[int, ...] | None = None
) -> Complex64[TArray, "..."]:
    """Compute FFT on the specified axes of the array.

    Args:
        array: Input array.
        size: Target size for each axis after FFT (or `None` to use the
            input size).
        axes: Axes along which to compute the FFT.
        shift: Axes to shift after FFT, if any.

    Returns:
        FFT of the input array along the specified axes.
    """
    ...

hann abstractmethod staticmethod

hann(iq: Complex64[TArray, ...], axis: int) -> Complex64[TArray, ...]

Apply a Hann window to the specified axis of the IQ data.

Parameters:

Name Type Description Default
iq Complex64[TArray, ...]

IQ data.

required
axis int

Axis along which to apply the Hann window.

required

Returns:

Type Description
Complex64[TArray, ...]

IQ data with the Hann window applied along the specified axis.

Source code in src/xwr/rsp/generic.py
@staticmethod
@abstractmethod
def hann(
    iq: Complex64[TArray, "..."], axis: int
) -> Complex64[TArray, "..."]:
    """Apply a Hann window to the specified axis of the IQ data.

    Args:
        iq: IQ data.
        axis: Axis along which to apply the Hann window.

    Returns:
        IQ data with the Hann window applied along the specified axis.
    """
    ...

mimo_virtual_array abstractmethod

mimo_virtual_array(
    rd: Complex64[TArray, "#batch doppler tx rx range"],
) -> Complex64[TArray, "#batch doppler elevation azimuth range"]

Set up MIMO virtual array from range-doppler spectrum.

Parameters:

Name Type Description Default
rd Complex64[TArray, '#batch doppler tx rx range']

range-doppler spectrum.

required

Returns:

Type Description
Complex64[TArray, '#batch doppler elevation azimuth range']

Computed MIMO virtual array, in elevation-azimuth order.

Source code in src/xwr/rsp/generic.py
@abstractmethod
def mimo_virtual_array(
    self, rd: Complex64[TArray, "#batch doppler tx rx range"]
) -> Complex64[TArray, "#batch doppler elevation azimuth range"]:
    """Set up MIMO virtual array from range-doppler spectrum.

    Args:
        rd: range-doppler spectrum.

    Returns:
        Computed MIMO virtual array, in elevation-azimuth order.
    """
    ...

xwr.rsp.iq_from_iiqq

iq_from_iiqq(
    iiqq: Int16[TArray, "... n"] | Complex64[TArray, "... _n"],
) -> Complex64[TArray, "... n2"]

Un-interleave IIQQ data.

Type Parameters
  • TArray: This function is multi-backend, and supports numpy np.ndarray, jax jax.Array, and torch Tensor.

Parameters:

Name Type Description Default
iiqq Int16[TArray, '... n'] | Complex64[TArray, '... _n']

interleaved IIQQ data; see RadarFrame. If already complex, leave it as is.

required

Returns:

Type Description
Complex64[TArray, '... n2']

Complex IQ data.

Source code in src/xwr/rsp/generic.py
def iq_from_iiqq(
    iiqq: Int16[TArray, "... n"] | Complex64[TArray, "... _n"],
) -> Complex64[TArray, "... n2"]:
    """Un-interleave IIQQ data.

    Type Parameters:
        - `TArray`: This function is multi-backend, and supports numpy
            `np.ndarray`, jax `jax.Array`, and torch `Tensor`.

    Args:
        iiqq: interleaved IIQQ data; see [`RadarFrame`][xwr.capture.types.].
            If already complex, leave it as is.

    Returns:
        Complex IQ data.
    """
    shape = (*iiqq.shape[:-1], iiqq.shape[-1] // 2)

    backend = _check_backend(iiqq)
    if backend == "numpy":
        assert isinstance(iiqq, np.ndarray)

        if iiqq.dtype == np.complex64:
            return iiqq
        iq = np.zeros(shape, dtype=np.complex64)
        iq[..., 0::2] = 1j * iiqq[..., 0::4] + iiqq[..., 2::4]
        iq[..., 1::2] = 1j * iiqq[..., 1::4] + iiqq[..., 3::4]
        return cast(Complex64[TArray, "... n/2"], iq)

    elif backend == "jax":
        from jax import numpy as jnp
        assert isinstance(iiqq, jnp.ndarray)

        if iiqq.dtype == jnp.complex64:
            return iiqq
        iq = jnp.zeros(
            shape, dtype=jnp.complex64
        ).at[..., 0::2].set(1j * iiqq[..., 0::4] + iiqq[..., 2::4]
        ).at[..., 1::2].set(1j * iiqq[..., 1::4] + iiqq[..., 3::4])
        return cast(Complex64[TArray, "... n/2"], iq)

    else: # backend == "torch"
        import torch
        assert isinstance(iiqq, torch.Tensor)

        if iiqq.dtype == torch.complex64:
            return iiqq
        iq = torch.zeros(shape, dtype=torch.complex64, device=iiqq.device)
        iq[..., 0::2] = 1j * iiqq[..., 0::4] + iiqq[..., 2::4]
        iq[..., 1::2] = 1j * iiqq[..., 1::4] + iiqq[..., 3::4]
        return cast(Complex64[TArray, "... n/2"], iq)

xwr.rsp.iqiq_from_iiqq

iqiq_from_iiqq(iiqq: Int16[TArray, '... n']) -> Int16[TArray, '... n/2 2']

Un-interleave IIQQ data.

Type Parameters
  • TArray: This function is multi-backend, and supports numpy np.ndarray, jax jax.Array, and torch Tensor.

Parameters:

Name Type Description Default
iiqq Int16[TArray, '... n']

interleaved IIQQ data; see RadarFrame.

required

Returns:

Type Description
Int16[TArray, '... n/2 2']

IQ data in an uninterleaved format with a trailing I/Q axis.

Source code in src/xwr/rsp/generic.py
def iqiq_from_iiqq(
    iiqq: Int16[TArray, "... n"]
) -> Int16[TArray, "... n/2 2"]:
    """Un-interleave IIQQ data.

    Type Parameters:
        - `TArray`: This function is multi-backend, and supports numpy
            `np.ndarray`, jax `jax.Array`, and torch `Tensor`.

    Args:
        iiqq: interleaved IIQQ data; see [`RadarFrame`][xwr.capture.types.].

    Returns:
        IQ data in an uninterleaved format with a trailing I/Q axis.
    """
    shape = (*iiqq.shape[:-1], iiqq.shape[-1] // 2)

    backend = _check_backend(iiqq)
    if backend == "numpy":
        assert isinstance(iiqq, np.ndarray)

        iq = np.zeros((*shape, 2), dtype=np.int16)
        iq[..., 0::2, 1] = iiqq[..., 0::4]
        iq[..., 1::2, 1] = iiqq[..., 1::4]
        iq[..., 0::2, 0] = iiqq[..., 2::4]
        iq[..., 1::2, 0] = iiqq[..., 3::4]
        return cast(Int16[TArray, "... n/2 2"], iq)

    elif backend == "jax":
        from jax import numpy as jnp
        assert isinstance(iiqq, jnp.ndarray)

        iq = jnp.zeros(
            (*shape, 2), dtype=jnp.int16
        ).at[..., 0::2, 1].set(iiqq[..., 0::4]
        ).at[..., 1::2, 1].set(iiqq[..., 1::4]
        ).at[..., 0::2, 0].set(iiqq[..., 2::4]
        ).at[..., 1::2, 0].set(iiqq[..., 3::4])
        return cast(Int16[TArray, "... n/2 2"], iq)

    else:  # backend == "torch"
        import torch
        assert isinstance(iiqq, torch.Tensor)

        iq = torch.zeros((*shape, 2), dtype=torch.int16, device=iiqq.device)
        iq[..., 0::2, 1] = iiqq[..., 0::4]
        iq[..., 1::2, 1] = iiqq[..., 1::4]
        iq[..., 0::2, 0] = iiqq[..., 2::4]
        iq[..., 1::2, 0] = iiqq[..., 3::4]
        return cast(Int16[TArray, "... n/2 2"], iq)