darcyai.pipeline
Pipeline Objects
class Pipeline()
The Pipeline class is the main class of the darcyai package.
Arguments
- input_stream (
InputStream
): The input stream to be used by the pipeline. - input_data_history_len (
int
): The number of input data items to be stored in the history. Defaults to1
. - pom_history_len (
int
): The number of POM items to be stored in the history. Defaults to1
. - metrics_history_len (
int
): The number of metrics items to be stored in the history. Defaults to1
. - num_of_edge_tpus (
int
): The number of Edge TPUs. Defaults to1
. - perceptor_error_handler_callback (
Callable[[str, Exception], None]
): The callback function to be called when a Perceptor throws an exception. Defaults toNone
. - output_stream_error_handler_callback (
Callable[[str, Exception], None]
): The callback function to be called when an OutputStream throws an exception. Defaults toNone
. - input_stream_error_handler_callback (
Callable[[Exception], None]
): The callback function to be called when an InputStream throws an exception. Defaults toNone
. - perception_completion_callback (
Callable[[PerceptionObjectModel], None]
): The callback function to be called when all the perceptors have completed processing. Defaults toNone
. - universal_rest_api (
bool
): Whether or not to use the universal REST API. Defaults toFalse
. - rest_api_base_path (
str
): The base path of the REST API. Defaults to/
. - rest_api_flask_app (
Flask
): The Flask application to be used by the REST API. Defaults toNone
. - rest_api_port (
int
): The port of the REST API. Defaults to5000
. - rest_api_host (
str
): The host of the REST API. Defaults tolocalhost
.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera,
... input_data_history_len=10,
... pom_history_len=10,
... metrics_history_len=10,
... num_of_edge_tpus=1,
... perceptor_error_handler_callback=None,
... output_stream_error_handler_callback=None,
... input_stream_error_handler_callback=None,
... perception_completion_callback=None,
... pulse_completion_callback=None,
... universal_rest_api=True,
... rest_api_base_path="/",
... rest_api_flask_app=None,
... rest_api_port=5000,
... rest_api_host="localhost")
num_of_edge_tpus
def num_of_edge_tpus() -> int
Gets the number of Edge TPUs in the pipeline.
Returns
int
: The number of Edge TPUs in the pipeline.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.num_of_edge_tpus()
add_perceptor
def add_perceptor(name: str,
perceptor: Perceptor,
input_callback: Callable[
[StreamData, PerceptionObjectModel, ConfigRegistry],
Any] = None,
output_callback: Callable[
[Any, PerceptionObjectModel, ConfigRegistry],
Any] = None,
parent: str = None,
multi: bool = False,
accelerator_idx: int = 0,
default_config: Dict[str, Any] = None) -> None
Adds a new Perceptor to the pipeline.
Arguments
- name (
str
): The name of the Perceptor (must be a valid variable name). - perceptor (
Perceptor
): The Perceptor to be added. - input_callback (
Callable[[StreamData, PerceptionObjectModel, ConfigRegistry], Any]
): The callback function to be called when the Perceptor receives input data. Defaults toNone
. - output_callback (
Callable[[Any, PerceptionObjectModel, ConfigRegistry], Any]
): The callback function to be called when the Perceptor produces output data. Defaults toNone
. - parent (
str
): The name of the parent Perceptor. Defaults toNone
. - multi (
bool
): Whether or not to run the perceptor for each item in input data. Defaults toFalse
. - accelerator_idx (
int
): The index of the Edge TPU to be used by the Perceptor. Defaults to0
. - default_config (
Dict[str, Any]
): The default configuration for the Perceptor. Defaults toNone
.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.add_perceptor(name="perceptor",
... perceptor=MyPerceptor(),
... input_callback=None,
... output_callback=None,
... parent="input_stream",
... multi=True,
... accelerator_idx=0,
... default_config={"key": "value"})
add_perceptor_before
def add_perceptor_before(
name_to_insert_before: str,
name: str,
perceptor: Perceptor,
input_callback: Callable[
[StreamData, PerceptionObjectModel, ConfigRegistry], Any] = None,
output_callback: Callable[[Any, PerceptionObjectModel, ConfigRegistry],
Any] = None,
multi: bool = False,
accelerator_idx: int = 0,
default_config: dict = None) -> None
Adds a new Perceptor to the pipeline.
Arguments
- name_to_insert_before (
str
): The name of the Perceptor to insert the new Perceptor before. - name (
str
): The name of the Perceptor. - perceptor (
Perceptor
): The Perceptor to be added. - input_callback (
Callable[[StreamData, PerceptionObjectModel, ConfigRegistry], Any]
): The callback function to be called when the Perceptor receives input data. Defaults toNone
. - output_callback (
Callable[[Any, PerceptionObjectModel, ConfigRegistry], Any]
): The callback function to be called when the Perceptor produces output data. Defaults toNone
. - multi (
bool
): Whether or not to run the perceptor for each item in input data. Defaults toFalse
. - accelerator_idx (
int
): The index of the Edge TPU to be used by the Perceptor. Defaults to0
. - default_config (
dict
): The default configuration for the Perceptor. Defaults toNone
.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.add_perceptor_before(name="perceptor",
... name_to_insert_before="child_input_stream",
... perceptor=MyPerceptor(),
... input_callback=None,
... output_callback=None,
... multi=True,
... accelerator_idx=0,
... default_config={"key": "value"})
add_perceptor_after
def add_perceptor_after(
name_to_insert_after: str,
name: str,
perceptor: Perceptor,
input_callback: Callable[
[StreamData, PerceptionObjectModel, ConfigRegistry], Any] = None,
output_callback: Callable[[Any, PerceptionObjectModel, ConfigRegistry],
Any] = None,
multi: bool = False,
accelerator_idx: int = 0,
default_config: dict = None) -> None
Adds a new Perceptor to the pipeline.
Arguments
- name_to_insert_after (
str
): The name of the Perceptor to insert the new Perceptor after. - name (
str
): The name of the Perceptor. - perceptor (
Perceptor
): The Perceptor to be added. - input_callback (
Callable[[StreamData, PerceptionObjectModel, Any], ConfigRegistry]
): The callback function to be called when the Perceptor receives input data. Defaults toNone
. - output_callback (
Callable[[Any, PerceptionObjectModel, ConfigRegistry], Any]
): The callback function to be called when the Perceptor produces output data. Defaults toNone
. - multi (
bool
): Whether or not to run the perceptor for each item in input data. Defaults toFalse
. - accelerator_idx (
int
): The index of the Edge TPU to be used by the Perceptor. Defaults to0
. - default_config (
dict
): The default configuration for the Perceptor. Defaults toNone
.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.add_perceptor_after(name="perceptor",
... name_to_insert_after="parent_input_stream",
... perceptor=MyPerceptor(),
... input_callback=None,
... output_callback=None,
... multi=True,
... accelerator_idx=0,
... default_config={"key": "value"})
add_parallel_perceptor
def add_parallel_perceptor(
name_to_insert_in_parallel_with: str,
name: str,
perceptor: Perceptor,
input_callback: Callable[
[StreamData, PerceptionObjectModel, ConfigRegistry], Any] = None,
output_callback: Callable[[Any, PerceptionObjectModel, ConfigRegistry],
Any] = None,
multi: bool = False,
accelerator_idx: int = 0,
default_config: dict = None) -> None
Adds a new Perceptor to the pipeline.
Arguments
- name_to_insert_in_parallel_with (
str
): The name of the Perceptor to insert the new Perceptor in parallel with. - name (
str
): The name of the Perceptor. - perceptor (
Perceptor
): The Perceptor to be added. - input_callback (
Callable[[StreamData, PerceptionObjectModel, ConfigRegistry], Any]
): The callback function to be called when the Perceptor receives input data. Defaults toNone
. - output_callback (
Callable[[Any, PerceptionObjectModel, ConfigRegistry], Any]
): The callback function to be called when the Perceptor produces output data. Defaults toNone
. - multi (
bool
): Whether or not to run the perceptor for each item in input data. Defaults toFalse
. - accelerator_idx (
int
): The index of the Edge TPU to be used by the Perceptor. Defaults toNone
. - default_config (
dict
): The default configuration for the Perceptor. Defaults toNone
.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.add_parallel_perceptor(name="perceptor",
... name_to_insert_in_parallel_with="parallel_input_stream",
... perceptor=MyPerceptor(),
... input_callback=None,
... output_callback=None,
... multi=True,
... accelerator_idx=0,
... default_config={"key": "value"})
update_input_stream
def update_input_stream(input_stream: InputStream) -> None
Updates the input stream of the pipeline.
Arguments
- input_stream (
InputStream
): The input stream to be added.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.update_input_stream(camera)
<a id="darcyai.pipeline.Pipeline.add_output_stream"></a>
#### add\_output\_stream
```python
def add_output_stream(name: str,
callback: Callable[[PerceptionObjectModel, StreamData],
Any],
output_stream: OutputStream,
default_config: dict = None) -> None
Adds an OutputStream to the pipeline.
Arguments
- name (
str
): The name of the OutputStream. - callback (
Callable[[PerceptionObjectModel, StreamData], Any]
): A callback function that is called whith PerceptionObjectModel object and returns the data that the output stream must process. - output_stream (
OutputStream
): The OutputStream to be added. - default_config (
dict
): The default configuration for the OutputStream. Defaults toNone
.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.add_output_stream(name="output_stream",
... callback=None,
... output_stream=MyOutputStream(),
... default_config={"key": "value"})
remove_output_stream
def remove_output_stream(name: str) -> None
Removes an OutputStream from the pipeline.
Arguments
- name (
str
): The name of the OutputStream to be removed.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)\
>>> pipeline.add_output_stream(name="output_stream",
... callback=None,
... output_stream=MyOutputStream(),
... default_config={"key": "value"})
>>> pipeline.remove_output_stream(name="output_stream")
stop
def stop() -> None
Stops the pipeline.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.stop()
run
def run() -> None
Runs the pipeline.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.run()
get_pom
def get_pom() -> PerceptionObjectModel
Gets the Perception Object Model.
Returns
PerceptionObjectModel
: The Perception Object Model.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pom = pipeline.get_pom()
get_current_pulse_number
def get_current_pulse_number() -> int
Gets the current pulse number.
Returns
int
: The current pulse number.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pulse_number = pipeline.get_current_pulse_number()
get_latest_input
def get_latest_input() -> StreamData
Gets the latest input data.
Returns
StreamData
: The latest input data.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> latest_input = pipeline.get_latest_input()
get_historical_input
def get_historical_input(pulse_number: int) -> StreamData
Gets the input data from the history.
Arguments
- pulse_number (
int
): The pulse number.
Returns
StreamData
: The input data from the history.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> historical_input = pipeline.get_historical_input(pulse_number=1)
get_input_history
def get_input_history() -> Dict[int, StreamData]
Gets the input data history.
Returns
Dict[int, StreamData]
- The input data history.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> input_history = pipeline.get_input_history()
get_historical_pom
def get_historical_pom(pulse_number: int) -> PerceptionObjectModel
Gets the POM from the history.
Arguments
- pulse_number (
int
): The pulse number.
Returns
PerceptionObjectModel
: The POM from the history.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> historical_pom = pipeline.get_historical_pom(pulse_number=1)
get_pom_history
def get_pom_history() -> Dict[int, PerceptionObjectModel]
Gets the POM history.
Returns
Dict[int, PerceptionObjectModel]
- The POM history.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pom_history = pipeline.get_pom_history()
run_perceptor
def run_perceptor(perceptor: Perceptor,
input_data: Any,
multi: bool = False) -> Any
Runs the Perceptor.
Arguments
- perceptor (
Perceptor
): The Perceptor to be run. - input_data (
Any
): The input data. - multi (
bool
): Whether or not to run the perceptor for each item in input data. Defaults toFalse
.
Returns
Any
: The result of running the Perceptor.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> result = pipeline.run_perceptor(perceptor=Perceptor(), input_data=None, multi=True)
get_graph
def get_graph() -> Any
Gets the graph of the perceptors.
Returns
Any
: The graph of the perceptors.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> graph = pipeline.get_graph()
get_all_performance_metrics
def get_all_performance_metrics() -> Dict[str, Any]
Gets the performance metrics of the pipeline.
Returns
Dict[str, Any]
- The performance metrics of the pipeline.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> metrics = pipeline.get_all_performance_metrics()
get_pulse_performance_metrics
def get_pulse_performance_metrics(
pulse_number: Union[int, None] = None) -> Dict[str, Any]
Gets the performance metrics of the pipeline for specific pulse.
Arguments
- pulse_number (
int
): The pulse number of the pulse. Defaults to current pulse. Defaults toNone
.
Returns
Dict[str, Any]
- The performance metrics of the pipeline for specific pulse.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> metrics = pipeline.get_pulse_performance_metrics(pulse_number=1)
get_perceptor_performance_metrics
def get_perceptor_performance_metrics(name: str,
pulse_number: Union[int, None] = None
) -> Dict[str, Any]
Gets the performance metrics of the pipeline for specific perceptor.
Arguments
- name (
str
): The name of the perceptor. - pulse_number (
int
): The pulse number of the pulse. Defaults to current pulse. Defaults toNone
.
Returns
Dict[str, Any]
- The performance metrics of the pipeline for specific perceptor.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> metrics = pipeline.get_perceptor_performance_metrics(name="perceptor_name",
... pulse_number=1)
set_perceptor_config
def set_perceptor_config(perceptor_name: str, name: str, value: Any) -> None
Sets the config of the pipeline.
Arguments
- perceptor_name (
str
): The name of the perceptor. - name (
str
): The name of the config. - value (
Any
): The value of the config.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.set_perceptor_config(perceptor_name="perceptor_name",
... name="config_name",
... value=1)
get_perceptor_config
def get_perceptor_config(perceptor_name: str) -> Dict[str, Tuple[Any, Config]]
Gets the config of the perceptor.
Arguments
- perceptor_name (
str
): The name of the perceptor.
Returns
Dict[str, Tuple[Any, Config]]
- The config of the perceptor.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> config = pipeline.get_perceptor_config(perceptor_name="perceptor_name")
set_output_stream_config
def set_output_stream_config(name: str, config_name: str, value: Any) -> None
Sets the config of the output stream.
Arguments
- name (
str
): The name of the output stream. - config_name (
str
): The name of the config. - value (
Any
): The value of the config.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> pipeline.set_output_stream_config(name="output_stream_name",
... config_name="config_name",
... value=1)
get_output_stream_config
def get_output_stream_config(name: str) -> Dict[str, Tuple[Any, Config]]
Gets the config of the output stream.
Arguments
- name (
str
): The name of the output stream.
Returns
Dict[str, Tuple[Any, Config]]
- The config of the output stream.
Examples
>>> from darcyai.input.camera_stream import CameraStream
>>> from darcyai.pipeline import Pipeline
>>> camera = CameraStream(video_device="/dev/video0")
>>> pipeline = Pipeline(input_stream=camera)
>>> config = pipeline.get_output_stream_config(name="output_stream_name")