darcyai.input.input_multi_stream

InputMultiStream Objects

class InputMultiStream()

A class that represents a collection of input streams.

Arguments

  • aggregator (Callable[[None], StreamData]): a function that takes a list of data and returns a single data point
  • callback (Callable[[StreamData], None]): a function that gets called when data is received from a stream

Examples

>>> from darcyai.input.input_multi_stream import InputMultiStream
>>> from darcyai.stream_data import StreamData

>>> def aggregator():
...     return StreamData("data", 1234567890)

>>> def callback(data: StreamData):
...     print(data.data, data.timestamp)

>>> input_stream = InputMultiStream(aggregator, callback)

remove_stream

def remove_stream(name: str) -> None

Removes a stream from the collection.

Arguments

  • name (str): the name of the stream to remove

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.input.input_multi_stream import InputMultiStream

>>> usb_camera = CameraStream(video_device="/dev/video0")

>>> input_stream = InputMultiStream(aggregator, callback)
>>> input_stream.add_stream("usb_camera", usb_camera)

>>> input_stream.remove_stream("usb_camera")

get_stream

def get_stream(name: str) -> InputStream

Gets a stream from the collection.

Arguments

  • name (str): the name of the stream to get

Returns

InputStream: the stream with the given name

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.input.input_multi_stream import InputMultiStream

>>> usb_camera = CameraStream(video_device="/dev/video0")

>>> input_stream = InputMultiStream(aggregator, callback)
>>> input_stream.add_stream("usb_camera", usb_camera)

>>> input_stream.get_stream("usb_camera")

add_stream

def add_stream(name: str, stream: InputStream) -> None

Adds a stream to the collection.

Arguments

  • name (str): the name of the stream to add
  • stream (InputStream): the stream to add

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.input.input_multi_stream import InputMultiStream

>>> usb_camera = CameraStream(video_device="/dev/video0")

>>> input_stream = InputMultiStream(aggregator, callback)
>>> input_stream.add_stream("usb_camera", usb_camera)

stream

def stream() -> Iterable[StreamData]

Starts streaming data from all streams in the collection.

Returns

Iterable[StreamData]: an iterable of data from all streams

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.input.input_multi_stream import InputMultiStream

>>> usb_camera = CameraStream(video_device="/dev/video0")

>>> input_stream = InputMultiStream(aggregator, callback)
>>> input_stream.add_stream("usb_camera", usb_camera)

>>> input_stream.stream()

stop

def stop() -> None

Stops streaming data from all streams in the collection.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.input.input_multi_stream import InputMultiStream

>>> usb_camera = CameraStream(video_device="/dev/video0")

>>> input_stream = InputMultiStream(aggregator, callback)
>>> input_stream.add_stream("usb_camera", usb_camera)

>>> input_stream.stop()