darcyai.pipeline
Pipeline Objects
class Pipeline()
The Pipeline class is the main class of the darcyai package.
Arguments
- input_stream (
InputStream): The input stream to be used by the pipeline. - input_data_history_len (
int): The number of input data items to be stored in the history. Defaults to1. - pom_history_len (
int): The number of POM items to be stored in the history. Defaults to1. - metrics_history_len (
int): The number of metrics items to be stored in the history. Defaults to1. - num_of_edge_tpus (
int): The number of Edge TPUs. Defaults to1. - perceptor_error_handler_callback (
Callable[[str, Exception], None]): The callback function to be called when a Perceptor throws an exception. Defaults toNone. - output_stream_error_handler_callback (
Callable[[str, Exception], None]): The callback function to be called when an OutputStream throws an exception. Defaults toNone. - input_stream_error_handler_callback (
Callable[[Exception], None]): The callback function to be called when an InputStream throws an exception. Defaults toNone. - perception_completion_callback (
Callable[[PerceptionObjectModel], None]): The callback function to be called when all the perceptors have completed processing. Defaults toNone. - universal_rest_api (
bool): Whether or not to use the universal REST API. Defaults toFalse. - rest_api_base_path (
str): The base path of the REST API. Defaults to/. - rest_api_flask_app (
Flask): The Flask application to be used by the REST API. Defaults toNone. - rest_api_port (
int): The port of the REST API. Defaults to5000. - rest_api_host (
str): The host of the REST API. Defaults tolocalhost. 
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera,
...                     input_data_history_len=10,
...                     pom_history_len=10,
...                     metrics_history_len=10,
...                     num_of_edge_tpus=1,
...                     perceptor_error_handler_callback=None,
...                     output_stream_error_handler_callback=None,
...                     input_stream_error_handler_callback=None,
...                     perception_completion_callback=None,
...                     pulse_completion_callback=None,
...                     universal_rest_api=True,
...                     rest_api_base_path="/",
...                     rest_api_flask_app=None,
...                     rest_api_port=5000,
...                     rest_api_host="localhost")
num_of_edge_tpus
def num_of_edge_tpus() -> int
Gets the number of Edge TPUs in the pipeline.
Returns
int: The number of Edge TPUs in the pipeline.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.num_of_edge_tpus()
add_perceptor
def add_perceptor(name: str,
                  perceptor: Perceptor,
                  input_callback: Callable[
                      [StreamData, PerceptionObjectModel, ConfigRegistry],
                      Any] = None,
                  output_callback: Callable[
                      [Any, PerceptionObjectModel, ConfigRegistry],
                      Any] = None,
                  parent: str = None,
                  multi: bool = False,
                  accelerator_idx: int = 0,
                  default_config: Dict[str, Any] = None) -> None
Adds a new Perceptor to the pipeline.
Arguments
- name (
str): The name of the Perceptor (must be a valid variable name). - perceptor (
Perceptor): The Perceptor to be added. - input_callback (
Callable[[StreamData, PerceptionObjectModel, ConfigRegistry], Any]): The callback function to be called when the Perceptor receives input data. Defaults toNone. - output_callback (
Callable[[Any, PerceptionObjectModel, ConfigRegistry], Any]): The callback function to be called when the Perceptor produces output data. Defaults toNone. - parent (
str): The name of the parent Perceptor. Defaults toNone. - multi (
bool): Whether or not to run the perceptor for each item in input data. Defaults toFalse. - accelerator_idx (
int): The index of the Edge TPU to be used by the Perceptor. Defaults to0. - default_config (
Dict[str, Any]): The default configuration for the Perceptor. Defaults toNone. 
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.add_perceptor(name="perceptor",
...                        perceptor=MyPerceptor(),
...                        input_callback=None,
...                        output_callback=None,
...                        parent="input_stream",
...                        multi=True,
...                        accelerator_idx=0,
...                        default_config={"key": "value"})
add_perceptor_before
def add_perceptor_before(
        name_to_insert_before: str,
        name: str,
        perceptor: Perceptor,
        input_callback: Callable[
            [StreamData, PerceptionObjectModel, ConfigRegistry], Any] = None,
        output_callback: Callable[[Any, PerceptionObjectModel, ConfigRegistry],
                                  Any] = None,
        multi: bool = False,
        accelerator_idx: int = 0,
        default_config: dict = None) -> None
Adds a new Perceptor to the pipeline.
Arguments
- name_to_insert_before (
str): The name of the Perceptor to insert the new Perceptor before. - name (
str): The name of the Perceptor. - perceptor (
Perceptor): The Perceptor to be added. - input_callback (
Callable[[StreamData, PerceptionObjectModel, ConfigRegistry], Any]): The callback function to be called when the Perceptor receives input data. Defaults toNone. - output_callback (
Callable[[Any, PerceptionObjectModel, ConfigRegistry], Any]): The callback function to be called when the Perceptor produces output data. Defaults toNone. - multi (
bool): Whether or not to run the perceptor for each item in input data. Defaults toFalse. - accelerator_idx (
int): The index of the Edge TPU to be used by the Perceptor. Defaults to0. - default_config (
dict): The default configuration for the Perceptor. Defaults toNone. 
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.add_perceptor_before(name="perceptor",
...                               name_to_insert_before="child_input_stream",
...                               perceptor=MyPerceptor(),
...                               input_callback=None,
...                               output_callback=None,
...                               multi=True,
...                               accelerator_idx=0,
...                               default_config={"key": "value"})
add_perceptor_after
def add_perceptor_after(
        name_to_insert_after: str,
        name: str,
        perceptor: Perceptor,
        input_callback: Callable[
            [StreamData, PerceptionObjectModel, ConfigRegistry], Any] = None,
        output_callback: Callable[[Any, PerceptionObjectModel, ConfigRegistry],
                                  Any] = None,
        multi: bool = False,
        accelerator_idx: int = 0,
        default_config: dict = None) -> None
Adds a new Perceptor to the pipeline.
Arguments
- name_to_insert_after (
str): The name of the Perceptor to insert the new Perceptor after. - name (
str): The name of the Perceptor. - perceptor (
Perceptor): The Perceptor to be added. - input_callback (
Callable[[StreamData, PerceptionObjectModel, Any], ConfigRegistry]): The callback function to be called when the Perceptor receives input data. Defaults toNone. - output_callback (
Callable[[Any, PerceptionObjectModel, ConfigRegistry], Any]): The callback function to be called when the Perceptor produces output data. Defaults toNone. - multi (
bool): Whether or not to run the perceptor for each item in input data. Defaults toFalse. - accelerator_idx (
int): The index of the Edge TPU to be used by the Perceptor. Defaults to0. - default_config (
dict): The default configuration for the Perceptor. Defaults toNone. 
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.add_perceptor_after(name="perceptor",
...                               name_to_insert_after="parent_input_stream",
...                               perceptor=MyPerceptor(),
...                               input_callback=None,
...                               output_callback=None,
...                               multi=True,
...                               accelerator_idx=0,
...                               default_config={"key": "value"})
add_parallel_perceptor
def add_parallel_perceptor(
        name_to_insert_in_parallel_with: str,
        name: str,
        perceptor: Perceptor,
        input_callback: Callable[
            [StreamData, PerceptionObjectModel, ConfigRegistry], Any] = None,
        output_callback: Callable[[Any, PerceptionObjectModel, ConfigRegistry],
                                  Any] = None,
        multi: bool = False,
        accelerator_idx: int = 0,
        default_config: dict = None) -> None
Adds a new Perceptor to the pipeline.
Arguments
- name_to_insert_in_parallel_with (
str): The name of the Perceptor to insert the new Perceptor in parallel with. - name (
str): The name of the Perceptor. - perceptor (
Perceptor): The Perceptor to be added. - input_callback (
Callable[[StreamData, PerceptionObjectModel, ConfigRegistry], Any]): The callback function to be called when the Perceptor receives input data. Defaults toNone. - output_callback (
Callable[[Any, PerceptionObjectModel, ConfigRegistry], Any]): The callback function to be called when the Perceptor produces output data. Defaults toNone. - multi (
bool): Whether or not to run the perceptor for each item in input data. Defaults toFalse. - accelerator_idx (
int): The index of the Edge TPU to be used by the Perceptor. Defaults toNone. - default_config (
dict): The default configuration for the Perceptor. Defaults toNone. 
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.add_parallel_perceptor(name="perceptor",
...                                 name_to_insert_in_parallel_with="parallel_input_stream",
...                                 perceptor=MyPerceptor(),
...                                 input_callback=None,
...                                 output_callback=None,
...                                 multi=True,
...                                 accelerator_idx=0,
...                                 default_config={"key": "value"})
update_input_stream
def update_input_stream(input_stream: InputStream) -> None
Updates the input stream of the pipeline.
Arguments
- input_stream (
InputStream): The input stream to be added. 
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.update_input_stream(camera)
<a id="darcyai.pipeline.Pipeline.add_output_stream"></a>
#### add\_output\_stream
```python
def add_output_stream(name: str,
                      callback: Callable[[PerceptionObjectModel, StreamData],
                                         Any],
                      output_stream: OutputStream,
                      default_config: dict = None) -> None
Adds an OutputStream to the pipeline.
Arguments
- name (
str): The name of the OutputStream. - callback (
Callable[[PerceptionObjectModel, StreamData], Any]): A callback function that is called whith PerceptionObjectModel object and returns the data that the output stream must process. - output_stream (
OutputStream): The OutputStream to be added. - default_config (
dict): The default configuration for the OutputStream. Defaults toNone. 
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.add_output_stream(name="output_stream",
...                            callback=None,
...                            output_stream=MyOutputStream(),
...                            default_config={"key": "value"})
remove_output_stream
def remove_output_stream(name: str) -> None
Removes an OutputStream from the pipeline.
Arguments
- name (
str): The name of the OutputStream to be removed. 
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)\
>>> pipeline.add_output_stream(name="output_stream",
...                            callback=None,
...                            output_stream=MyOutputStream(),
...                            default_config={"key": "value"})
>>> pipeline.remove_output_stream(name="output_stream")
stop
def stop() -> None
Stops the pipeline.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.stop()
run
def run() -> None
Runs the pipeline.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.run()
get_pom
def get_pom() -> PerceptionObjectModel
Gets the Perception Object Model.
Returns
PerceptionObjectModel: The Perception Object Model.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pom = pipeline.get_pom()
get_current_pulse_number
def get_current_pulse_number() -> int
Gets the current pulse number.
Returns
int: The current pulse number.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pulse_number = pipeline.get_current_pulse_number()
get_latest_input
def get_latest_input() -> StreamData
Gets the latest input data.
Returns
StreamData: The latest input data.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> latest_input = pipeline.get_latest_input()
get_historical_input
def get_historical_input(pulse_number: int) -> StreamData
Gets the input data from the history.
Arguments
- pulse_number (
int): The pulse number. 
Returns
StreamData: The input data from the history.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> historical_input = pipeline.get_historical_input(pulse_number=1)
get_input_history
def get_input_history() -> Dict[int, StreamData]
Gets the input data history.
Returns
Dict[int, StreamData] - The input data history.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> input_history = pipeline.get_input_history()
get_historical_pom
def get_historical_pom(pulse_number: int) -> PerceptionObjectModel
Gets the POM from the history.
Arguments
- pulse_number (
int): The pulse number. 
Returns
PerceptionObjectModel: The POM from the history.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> historical_pom = pipeline.get_historical_pom(pulse_number=1)
get_pom_history
def get_pom_history() -> Dict[int, PerceptionObjectModel]
Gets the POM history.
Returns
Dict[int, PerceptionObjectModel] - The POM history.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pom_history = pipeline.get_pom_history()
run_perceptor
def run_perceptor(perceptor: Perceptor,
                  input_data: Any,
                  multi: bool = False) -> Any
Runs the Perceptor.
Arguments
- perceptor (
Perceptor): The Perceptor to be run. - input_data (
Any): The input data. - multi (
bool): Whether or not to run the perceptor for each item in input data. Defaults toFalse. 
Returns
Any: The result of running the Perceptor.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> result = pipeline.run_perceptor(perceptor=Perceptor(), input_data=None, multi=True)
get_graph
def get_graph() -> Any
Gets the graph of the perceptors.
Returns
Any: The graph of the perceptors.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> graph = pipeline.get_graph()
get_all_performance_metrics
def get_all_performance_metrics() -> Dict[str, Any]
Gets the performance metrics of the pipeline.
Returns
Dict[str, Any] - The performance metrics of the pipeline.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> metrics = pipeline.get_all_performance_metrics()
get_pulse_performance_metrics
def get_pulse_performance_metrics(
        pulse_number: Union[int, None] = None) -> Dict[str, Any]
Gets the performance metrics of the pipeline for specific pulse.
Arguments
- pulse_number (
int): The pulse number of the pulse. Defaults to current pulse. Defaults toNone. 
Returns
Dict[str, Any] - The performance metrics of the pipeline for specific pulse.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> metrics = pipeline.get_pulse_performance_metrics(pulse_number=1)
get_perceptor_performance_metrics
def get_perceptor_performance_metrics(name: str,
                                      pulse_number: Union[int, None] = None
                                      ) -> Dict[str, Any]
Gets the performance metrics of the pipeline for specific perceptor.
Arguments
- name (
str): The name of the perceptor. - pulse_number (
int): The pulse number of the pulse. Defaults to current pulse. Defaults toNone. 
Returns
Dict[str, Any] - The performance metrics of the pipeline for specific perceptor.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> metrics = pipeline.get_perceptor_performance_metrics(name="perceptor_name",
...                                                      pulse_number=1)
set_perceptor_config
def set_perceptor_config(perceptor_name: str, name: str, value: Any) -> None
Sets the config of the pipeline.
Arguments
- perceptor_name (
str): The name of the perceptor. - name (
str): The name of the config. - value (
Any): The value of the config. 
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.set_perceptor_config(perceptor_name="perceptor_name",
...                               name="config_name",
...                               value=1)
get_perceptor_config
def get_perceptor_config(perceptor_name: str) -> Dict[str, Tuple[Any, Config]]
Gets the config of the perceptor.
Arguments
- perceptor_name (
str): The name of the perceptor. 
Returns
Dict[str, Tuple[Any, Config]] - The config of the perceptor.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> config = pipeline.get_perceptor_config(perceptor_name="perceptor_name")
set_output_stream_config
def set_output_stream_config(name: str, config_name: str, value: Any) -> None
Sets the config of the output stream.
Arguments
- name (
str): The name of the output stream. - config_name (
str): The name of the config. - value (
Any): The value of the config. 
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.set_output_stream_config(name="output_stream_name",
...                                   config_name="config_name",
...                                   value=1)
get_output_stream_config
def get_output_stream_config(name: str) -> Dict[str, Tuple[Any, Config]]
Gets the config of the output stream.
Arguments
- name (
str): The name of the output stream. 
Returns
Dict[str, Tuple[Any, Config]] - The config of the output stream.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> config = pipeline.get_output_stream_config(name="output_stream_name")