darcyai.pipeline

Pipeline Objects

class Pipeline()

The Pipeline class is the main class of the darcyai package.

Arguments

  • input_stream (InputStream): The input stream to be used by the pipeline.
  • input_data_history_len (int): The number of input data items to be stored in the history. Defaults to 1.
  • pom_history_len (int): The number of POM items to be stored in the history. Defaults to 1.
  • metrics_history_len (int): The number of metrics items to be stored in the history. Defaults to 1.
  • num_of_edge_tpus (int): The number of Edge TPUs. Defaults to 1.
  • perceptor_error_handler_callback (Callable[[str, Exception], None]): The callback function to be called when a Perceptor throws an exception. Defaults to None.
  • output_stream_error_handler_callback (Callable[[str, Exception], None]): The callback function to be called when an OutputStream throws an exception. Defaults to None.
  • input_stream_error_handler_callback (Callable[[Exception], None]): The callback function to be called when an InputStream throws an exception. Defaults to None.
  • perception_completion_callback (Callable[[PerceptionObjectModel], None]): The callback function to be called when all the perceptors have completed processing. Defaults to None.
  • universal_rest_api (bool): Whether or not to use the universal REST API. Defaults to False.
  • rest_api_base_path (str): The base path of the REST API. Defaults to /.
  • rest_api_flask_app (Flask): The Flask application to be used by the REST API. Defaults to None.
  • rest_api_port (int): The port of the REST API. Defaults to 5000.
  • rest_api_host (str): The host of the REST API. Defaults to localhost.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera,
...                     input_data_history_len=10,
...                     pom_history_len=10,
...                     metrics_history_len=10,
...                     num_of_edge_tpus=1,
...                     perceptor_error_handler_callback=None,
...                     output_stream_error_handler_callback=None,
...                     input_stream_error_handler_callback=None,
...                     perception_completion_callback=None,
...                     pulse_completion_callback=None,
...                     universal_rest_api=True,
...                     rest_api_base_path="/",
...                     rest_api_flask_app=None,
...                     rest_api_port=5000,
...                     rest_api_host="localhost")

num_of_edge_tpus

def num_of_edge_tpus() -> int

Gets the number of Edge TPUs in the pipeline.

Returns

int: The number of Edge TPUs in the pipeline.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.num_of_edge_tpus()

add_perceptor

def add_perceptor(name: str,
                  perceptor: Perceptor,
                  input_callback: Callable[
                      [StreamData, PerceptionObjectModel, ConfigRegistry],
                      Any] = None,
                  output_callback: Callable[
                      [Any, PerceptionObjectModel, ConfigRegistry],
                      Any] = None,
                  parent: str = None,
                  multi: bool = False,
                  accelerator_idx: int = 0,
                  default_config: Dict[str, Any] = None) -> None

Adds a new Perceptor to the pipeline.

Arguments

  • name (str): The name of the Perceptor (must be a valid variable name).
  • perceptor (Perceptor): The Perceptor to be added.
  • input_callback (Callable[[StreamData, PerceptionObjectModel, ConfigRegistry], Any]): The callback function to be called when the Perceptor receives input data. Defaults to None.
  • output_callback (Callable[[Any, PerceptionObjectModel, ConfigRegistry], Any]): The callback function to be called when the Perceptor produces output data. Defaults to None.
  • parent (str): The name of the parent Perceptor. Defaults to None.
  • multi (bool): Whether or not to run the perceptor for each item in input data. Defaults to False.
  • accelerator_idx (int): The index of the Edge TPU to be used by the Perceptor. Defaults to 0.
  • default_config (Dict[str, Any]): The default configuration for the Perceptor. Defaults to None.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.add_perceptor(name="perceptor",
...                        perceptor=MyPerceptor(),
...                        input_callback=None,
...                        output_callback=None,
...                        parent="input_stream",
...                        multi=True,
...                        accelerator_idx=0,
...                        default_config={"key": "value"})

add_perceptor_before

def add_perceptor_before(
        name_to_insert_before: str,
        name: str,
        perceptor: Perceptor,
        input_callback: Callable[
            [StreamData, PerceptionObjectModel, ConfigRegistry], Any] = None,
        output_callback: Callable[[Any, PerceptionObjectModel, ConfigRegistry],
                                  Any] = None,
        multi: bool = False,
        accelerator_idx: int = 0,
        default_config: dict = None) -> None

Adds a new Perceptor to the pipeline.

Arguments

  • name_to_insert_before (str): The name of the Perceptor to insert the new Perceptor before.
  • name (str): The name of the Perceptor.
  • perceptor (Perceptor): The Perceptor to be added.
  • input_callback (Callable[[StreamData, PerceptionObjectModel, ConfigRegistry], Any]): The callback function to be called when the Perceptor receives input data. Defaults to None.
  • output_callback (Callable[[Any, PerceptionObjectModel, ConfigRegistry], Any]): The callback function to be called when the Perceptor produces output data. Defaults to None.
  • multi (bool): Whether or not to run the perceptor for each item in input data. Defaults to False.
  • accelerator_idx (int): The index of the Edge TPU to be used by the Perceptor. Defaults to 0.
  • default_config (dict): The default configuration for the Perceptor. Defaults to None.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.add_perceptor_before(name="perceptor",
...                               name_to_insert_before="child_input_stream",
...                               perceptor=MyPerceptor(),
...                               input_callback=None,
...                               output_callback=None,
...                               multi=True,
...                               accelerator_idx=0,
...                               default_config={"key": "value"})

add_perceptor_after

def add_perceptor_after(
        name_to_insert_after: str,
        name: str,
        perceptor: Perceptor,
        input_callback: Callable[
            [StreamData, PerceptionObjectModel, ConfigRegistry], Any] = None,
        output_callback: Callable[[Any, PerceptionObjectModel, ConfigRegistry],
                                  Any] = None,
        multi: bool = False,
        accelerator_idx: int = 0,
        default_config: dict = None) -> None

Adds a new Perceptor to the pipeline.

Arguments

  • name_to_insert_after (str): The name of the Perceptor to insert the new Perceptor after.
  • name (str): The name of the Perceptor.
  • perceptor (Perceptor): The Perceptor to be added.
  • input_callback (Callable[[StreamData, PerceptionObjectModel, Any], ConfigRegistry]): The callback function to be called when the Perceptor receives input data. Defaults to None.
  • output_callback (Callable[[Any, PerceptionObjectModel, ConfigRegistry], Any]): The callback function to be called when the Perceptor produces output data. Defaults to None.
  • multi (bool): Whether or not to run the perceptor for each item in input data. Defaults to False.
  • accelerator_idx (int): The index of the Edge TPU to be used by the Perceptor. Defaults to 0.
  • default_config (dict): The default configuration for the Perceptor. Defaults to None.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.add_perceptor_after(name="perceptor",
...                               name_to_insert_after="parent_input_stream",
...                               perceptor=MyPerceptor(),
...                               input_callback=None,
...                               output_callback=None,
...                               multi=True,
...                               accelerator_idx=0,
...                               default_config={"key": "value"})

add_parallel_perceptor

def add_parallel_perceptor(
        name_to_insert_in_parallel_with: str,
        name: str,
        perceptor: Perceptor,
        input_callback: Callable[
            [StreamData, PerceptionObjectModel, ConfigRegistry], Any] = None,
        output_callback: Callable[[Any, PerceptionObjectModel, ConfigRegistry],
                                  Any] = None,
        multi: bool = False,
        accelerator_idx: int = 0,
        default_config: dict = None) -> None

Adds a new Perceptor to the pipeline.

Arguments

  • name_to_insert_in_parallel_with (str): The name of the Perceptor to insert the new Perceptor in parallel with.
  • name (str): The name of the Perceptor.
  • perceptor (Perceptor): The Perceptor to be added.
  • input_callback (Callable[[StreamData, PerceptionObjectModel, ConfigRegistry], Any]): The callback function to be called when the Perceptor receives input data. Defaults to None.
  • output_callback (Callable[[Any, PerceptionObjectModel, ConfigRegistry], Any]): The callback function to be called when the Perceptor produces output data. Defaults to None.
  • multi (bool): Whether or not to run the perceptor for each item in input data. Defaults to False.
  • accelerator_idx (int): The index of the Edge TPU to be used by the Perceptor. Defaults to None.
  • default_config (dict): The default configuration for the Perceptor. Defaults to None.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.add_parallel_perceptor(name="perceptor",
...                                 name_to_insert_in_parallel_with="parallel_input_stream",
...                                 perceptor=MyPerceptor(),
...                                 input_callback=None,
...                                 output_callback=None,
...                                 multi=True,
...                                 accelerator_idx=0,
...                                 default_config={"key": "value"})

update_input_stream

def update_input_stream(input_stream: InputStream) -> None

Updates the input stream of the pipeline.

Arguments

  • input_stream (InputStream): The input stream to be added.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.update_input_stream(camera)

<a id="darcyai.pipeline.Pipeline.add_output_stream"></a>

#### add\_output\_stream

```python
def add_output_stream(name: str,
                      callback: Callable[[PerceptionObjectModel, StreamData],
                                         Any],
                      output_stream: OutputStream,
                      default_config: dict = None) -> None

Adds an OutputStream to the pipeline.

Arguments

  • name (str): The name of the OutputStream.
  • callback (Callable[[PerceptionObjectModel, StreamData], Any]): A callback function that is called whith PerceptionObjectModel object and returns the data that the output stream must process.
  • output_stream (OutputStream): The OutputStream to be added.
  • default_config (dict): The default configuration for the OutputStream. Defaults to None.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.add_output_stream(name="output_stream",
...                            callback=None,
...                            output_stream=MyOutputStream(),
...                            default_config={"key": "value"})

remove_output_stream

def remove_output_stream(name: str) -> None

Removes an OutputStream from the pipeline.

Arguments

  • name (str): The name of the OutputStream to be removed.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)\
>>> pipeline.add_output_stream(name="output_stream",
...                            callback=None,
...                            output_stream=MyOutputStream(),
...                            default_config={"key": "value"})
>>> pipeline.remove_output_stream(name="output_stream")

stop

def stop() -> None

Stops the pipeline.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.stop()

run

def run() -> None

Runs the pipeline.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.run()

get_pom

def get_pom() -> PerceptionObjectModel

Gets the Perception Object Model.

Returns

PerceptionObjectModel: The Perception Object Model.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pom = pipeline.get_pom()

get_current_pulse_number

def get_current_pulse_number() -> int

Gets the current pulse number.

Returns

int: The current pulse number.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pulse_number = pipeline.get_current_pulse_number()

get_latest_input

def get_latest_input() -> StreamData

Gets the latest input data.

Returns

StreamData: The latest input data.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> latest_input = pipeline.get_latest_input()

get_historical_input

def get_historical_input(pulse_number: int) -> StreamData

Gets the input data from the history.

Arguments

  • pulse_number (int): The pulse number.

Returns

StreamData: The input data from the history.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> historical_input = pipeline.get_historical_input(pulse_number=1)

get_input_history

def get_input_history() -> Dict[int, StreamData]

Gets the input data history.

Returns

Dict[int, StreamData] - The input data history.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> input_history = pipeline.get_input_history()

get_historical_pom

def get_historical_pom(pulse_number: int) -> PerceptionObjectModel

Gets the POM from the history.

Arguments

  • pulse_number (int): The pulse number.

Returns

PerceptionObjectModel: The POM from the history.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> historical_pom = pipeline.get_historical_pom(pulse_number=1)

get_pom_history

def get_pom_history() -> Dict[int, PerceptionObjectModel]

Gets the POM history.

Returns

Dict[int, PerceptionObjectModel] - The POM history.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pom_history = pipeline.get_pom_history()

run_perceptor

def run_perceptor(perceptor: Perceptor,
                  input_data: Any,
                  multi: bool = False) -> Any

Runs the Perceptor.

Arguments

  • perceptor (Perceptor): The Perceptor to be run.
  • input_data (Any): The input data.
  • multi (bool): Whether or not to run the perceptor for each item in input data. Defaults to False.

Returns

Any: The result of running the Perceptor.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> result = pipeline.run_perceptor(perceptor=Perceptor(), input_data=None, multi=True)

get_graph

def get_graph() -> Any

Gets the graph of the perceptors.

Returns

Any: The graph of the perceptors.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> graph = pipeline.get_graph()

get_all_performance_metrics

def get_all_performance_metrics() -> Dict[str, Any]

Gets the performance metrics of the pipeline.

Returns

Dict[str, Any] - The performance metrics of the pipeline.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> metrics = pipeline.get_all_performance_metrics()

get_pulse_performance_metrics

def get_pulse_performance_metrics(
        pulse_number: Union[int, None] = None) -> Dict[str, Any]

Gets the performance metrics of the pipeline for specific pulse.

Arguments

  • pulse_number (int): The pulse number of the pulse. Defaults to current pulse. Defaults to None.

Returns

Dict[str, Any] - The performance metrics of the pipeline for specific pulse.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> metrics = pipeline.get_pulse_performance_metrics(pulse_number=1)

get_perceptor_performance_metrics

def get_perceptor_performance_metrics(name: str,
                                      pulse_number: Union[int, None] = None
                                      ) -> Dict[str, Any]

Gets the performance metrics of the pipeline for specific perceptor.

Arguments

  • name (str): The name of the perceptor.
  • pulse_number (int): The pulse number of the pulse. Defaults to current pulse. Defaults to None.

Returns

Dict[str, Any] - The performance metrics of the pipeline for specific perceptor.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> metrics = pipeline.get_perceptor_performance_metrics(name="perceptor_name",
...                                                      pulse_number=1)

set_perceptor_config

def set_perceptor_config(perceptor_name: str, name: str, value: Any) -> None

Sets the config of the pipeline.

Arguments

  • perceptor_name (str): The name of the perceptor.
  • name (str): The name of the config.
  • value (Any): The value of the config.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.set_perceptor_config(perceptor_name="perceptor_name",
...                               name="config_name",
...                               value=1)

get_perceptor_config

def get_perceptor_config(perceptor_name: str) -> Dict[str, Tuple[Any, Config]]

Gets the config of the perceptor.

Arguments

  • perceptor_name (str): The name of the perceptor.

Returns

Dict[str, Tuple[Any, Config]] - The config of the perceptor.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> config = pipeline.get_perceptor_config(perceptor_name="perceptor_name")

set_output_stream_config

def set_output_stream_config(name: str, config_name: str, value: Any) -> None

Sets the config of the output stream.

Arguments

  • name (str): The name of the output stream.
  • config_name (str): The name of the config.
  • value (Any): The value of the config.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.set_output_stream_config(name="output_stream_name",
...                                   config_name="config_name",
...                                   value=1)

get_output_stream_config

def get_output_stream_config(name: str) -> Dict[str, Tuple[Any, Config]]

Gets the config of the output stream.

Arguments

  • name (str): The name of the output stream.

Returns

Dict[str, Tuple[Any, Config]] - The config of the output stream.

Examples

>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline

>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> config = pipeline.get_output_stream_config(name="output_stream_name")