darcyai.input.input_multi_stream
InputMultiStream Objects
class InputMultiStream()
A class that represents a collection of input streams.
Arguments
- aggregator (
Callable[[None], StreamData]
): a function that takes a list of data and returns a single data point - callback (
Callable[[StreamData], None]
): a function that gets called when data is received from a stream
Examples
>>> from darcyai.input.input_multi_stream import InputMultiStream
>>> from darcyai.stream_data import StreamData
>>> def aggregator():
... return StreamData("data", 1234567890)
>>> def callback(data: StreamData):
... print(data.data, data.timestamp)
>>> input_stream = InputMultiStream(aggregator, callback)
remove_stream
def remove_stream(name: str) -> None
Removes a stream from the collection.
Arguments
- name (
str
): the name of the stream to remove
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.input.input_multi_stream import InputMultiStream
>>> usb_camera = CameraStream(video_device="/dev/video0")
>>> input_stream = InputMultiStream(aggregator, callback)
>>> input_stream.add_stream("usb_camera", usb_camera)
>>> input_stream.remove_stream("usb_camera")
get_stream
def get_stream(name: str) -> InputStream
Gets a stream from the collection.
Arguments
- name (
str
): the name of the stream to get
Returns
InputStream
: the stream with the given name
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.input.input_multi_stream import InputMultiStream
>>> usb_camera = CameraStream(video_device="/dev/video0")
>>> input_stream = InputMultiStream(aggregator, callback)
>>> input_stream.add_stream("usb_camera", usb_camera)
>>> input_stream.get_stream("usb_camera")
add_stream
def add_stream(name: str, stream: InputStream) -> None
Adds a stream to the collection.
Arguments
- name (
str
): the name of the stream to add - stream (
InputStream
): the stream to add
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.input.input_multi_stream import InputMultiStream
>>> usb_camera = CameraStream(video_device="/dev/video0")
>>> input_stream = InputMultiStream(aggregator, callback)
>>> input_stream.add_stream("usb_camera", usb_camera)
stream
def stream() -> Iterable[StreamData]
Starts streaming data from all streams in the collection.
Returns
Iterable[StreamData]
: an iterable of data from all streams
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.input.input_multi_stream import InputMultiStream
>>> usb_camera = CameraStream(video_device="/dev/video0")
>>> input_stream = InputMultiStream(aggregator, callback)
>>> input_stream.add_stream("usb_camera", usb_camera)
>>> input_stream.stream()
stop
def stop() -> None
Stops streaming data from all streams in the collection.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.input.input_multi_stream import InputMultiStream
>>> usb_camera = CameraStream(video_device="/dev/video0")
>>> input_stream = InputMultiStream(aggregator, callback)
>>> input_stream.add_stream("usb_camera", usb_camera)
>>> input_stream.stop()