Skip to content

roverd.channels.utils

Channel utilities and types.

roverd.channels.utils.Data module-attribute

Data = ndarray | bytes | bytearray

Generic writable data.

Should generally behave as follows:

  • If Shaped[np.ndarray, "..."], the shape and dtype are assumed to have semantic meaning, and are verified.
  • If bytes or bytearray, we assume that the caller has already done any necessary binary conversion. No type or shape verification is performed.

roverd.channels.utils.Streamable module-attribute

Streamable = Iterator[T] | Iterable[T] | Queue[T]

Any stream-like container.

roverd.channels.utils.Buffer

Bases: Generic[T]

Simple queue buffer (i.e. queue to iterator) with batching.

Parameters:

Name Type Description Default
queue Queue[T]

queue to use as a buffer. Should return None when the stream is complete (i.e. StopIteration).

required
Source code in format/src/roverd/channels/utils.py
class Buffer(Generic[T]):
    """Simple queue buffer (i.e. queue to iterator) with batching.

    Args:
        queue: queue to use as a buffer. Should return `None` when the stream
            is complete (i.e. `StopIteration`).
    """

    def __init__(self, queue: Queue[T]) -> None:
        self.queue = queue

    def __iter__(self):
        return self

    def __next__(self) -> T:
        item = self.queue.get()
        if item is None:
            raise StopIteration
        else:
            return item

roverd.channels.utils.Prefetch

Bases: Buffer

Simple prefetch queue wrapper (i.e. iterator to queue).

Can be used as a prefetched iterator (for x in Prefetch(...)), or as a queue (Prefetch(...).queue). When used as a queue, None is put in the queue to indicate that the iterator has terminated.

Parameters:

Name Type Description Default
iterator Iterable[T] | Iterator[T]

any python iterator; must never yield None.

required
size int

prefetch buffer size.

64
Source code in format/src/roverd/channels/utils.py
class Prefetch(Buffer):
    """Simple prefetch queue wrapper (i.e. iterator to queue).

    Can be used as a prefetched iterator (`for x in Prefetch(...)`), or as a
    queue (`Prefetch(...).queue`). When used as a queue, `None` is put in the
    queue to indicate that the iterator has terminated.

    Args:
        iterator: any python iterator; must never yield `None`.
        size: prefetch buffer size.
    """

    def __init__(
        self, iterator: Iterable[T] | Iterator[T], size: int = 64
    ) -> None:
        super().__init__(queue=Queue(maxsize=size))
        self.iterator = iterator

        Thread(target=self._prefetch, daemon=True).start()

    def _prefetch(self) -> None:
        for item in self.iterator:
            self.queue.put(item)
        self.queue.put(None)

roverd.channels.utils.batch_iterator

batch_iterator(
    iterator: Iterator[T] | Iterable[T], size: int = 8
) -> Iterator[list[T]]

Convert an iterator into a batched version.

Parameters:

Name Type Description Default
iterator Iterator[T] | Iterable[T]

input iterator/iterable.

required
size int

batch size.

8
Source code in format/src/roverd/channels/utils.py
def batch_iterator(
    iterator: Iterator[T] | Iterable[T], size: int = 8
) -> Iterator[list[T]]:
    """Convert an iterator into a batched version.

    Args:
        iterator: input iterator/iterable.
        size: batch size.
    """
    buf = []
    for item in iterator:
        buf.append(item)
        if len(buf) == size:
            yield buf
            buf = []
    if len(buf) != 0:
        yield buf