5. TML Solution Templates
TML solution templates are designed to dramatically accelerate TML solution solution in a few minutes, when it would normally take companies weeks or months to build end-end real-time solutions at scale, with integrations with advanced machine learning, GenAI, automated docker container deployments and automated documentation, with automated Github code commits.
Solution templated require NO CODE or Configurations. Just RUN THEM in TSS! An example will be shown to make this easy to understand here Running A Solution Container
Important
ALL TML SOLUTIONS MUST BE DEVELOPED USING “COPIES” OF THESE SOLUTION TEMPLATES IN YOUR TML PROJECT FOLDER. Copies of these solution templates are automatically made for you when you create a TML project: Refer to here for details Lets Start Building a TML Solution
These solution templates execute all of the TML DAGs here DAG Solution Process Explanation
All you do is configure the parameters in the TML DAGs and RUN the Solution Templates. THAT IS IT!
5.1. The Solution Template Naming Conventions
The namees of the solution template tell you what the solution is about. Each solution template performs functions:
processing - using ingested data from: local file, MQTT, gRPC, REST
ML/predictions - using ingested data from: local file, MQTT, gRPC, REST
AI - using ingested data from: local file, MQTT, gRPC, REST
5.1.1. For example:
1. solution_preprocessing_dag: Is doing ONLY preprocessing using LOCAL FILE because it does not use MQTT, REST, gRPC and defaults to local file
2. solution_preprocessing_dag_mqtt: Is doing preprocessing using MQTT
3. solution_preprocessing_ai_grpc_dag: Is doing preprocessing AND AI using gRPC
4. solution_preprocessing_ml_ai_mqtt: Is dong preprocessing, ML AND AI using MQTT
5. solution_preprocessing_ml_mqtt_dag: Is doing preprocessing, ML using MQTT
6. solution_preprocessing_ml_dag: Is doing preprocessing and ML using LOCAL File
So on…
5.2. Here are the solution templates provided
Solution Templates |
1. Solution Template: solution_template_processing_ai_dag_grpc.py This template will analyse ANY real-time data using the gRPC protocol. See gRPC Reference Architecture with integration with GenAI for real-time AI analysis of TML output data. |
2. Solution Template: solution_template_processing_ai_dag_mqtt.py This template will analyse ANY real-time data using the MQTT protocol. See MQTT Reference Architecture with integration with GenAI for real-time AI analysis of TML output data. |
3. Solution Template: solution_template_processing_ai_dag_restapi.py This template will analyse ANY real-time data using the REST protocol. See REST API Reference Architecture with integration with GenAI for real-time AI analysis of TML output data. |
4. Solution Template: solution_template_processing_ai_dag.py This solution template will read a local file from the file system and stream it to the TML solution for processing, with integration with GenAI for further processing. |
5. Solution Template: solution_template_processing_dag_grpc.py This solution template will process data ingested using gRPC. |
6. Solution Template: solution_template_processing_dag_mqtt.py This solution template will process data using MQTT protocol. |
7. Solution Template: solution_template_processing_dag_restapi.py This solution template will process data using the REST API. |
8. Solution Template: solution_template_processing_dag.py This solution template will process data using local file. |
9. Solution Template: solution_template_processing_ml_ai_dag_grpc.py This solution template will process data, perform machine learning and perform AI on the output data, while ingesting data from gRPC protocol. |
10. Solution Template: solution_template_processing_ml_ai_dag_mqtt.py This solution template will process data, perform machine learning and perform AI on the output data, while ingesting data from MQTT protocol. |
11. Solution Template: solution_template_processing_ml_ai_dag_restapi.py This solution template will process data, perform machine learning and perform AI on the output data, while ingesting data from REST API protocol. |
12. Solution Template: solution_template_processing_ml_ai_dag.py This solution template will process data, perform machine learning and perform AI on the output data, while ingesting data from local file. |
13. Solution Template: solution_template_processing_ml_dag_grpc.py This solution template will process data, perform machine learning and predictions while ingesting data from gRPC protocol. |
14. Solution Template: solution_template_processing_ml_dag_mqtt.py This solution template will process data, perform machine learning and predictions while ingesting data from MQTT protocol. |
15. Solution Template: solution_template_processing_ml_dag_restapi.py This solution template will process data, perform machine learning and predictions while ingesting data from REST API protocol. |
16. Solution Template: solution_template_processing_ml_dag.py This solution template will process data, perform machine learning and predictions while ingesting data from local file. |
5.3. 1. Solution Template: solution_template_processing_ai_dag_grpc.py
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
ExternalPythonOperator,
PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_gRPC_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")
with DAG(
dag_id="solution_preprocessing_ai_grpc_dag",
start_date=datetime(2023, 1, 1),
schedule=None,
) as dag:
start_task = BashOperator(
task_id="start_tasks_tml_preprocessing_ai_grpc",
bash_command="echo 'Start task'",
)
# STEP 1: Get the Parameters
sensor_A = PythonOperator(
task_id="step_1_solution_task_getparams",
python_callable=step1.getparams,
provide_context=True,
)
# STEP 2: Create the Kafka topics
sensor_B = PythonOperator(
task_id="step_2_solution_task_createtopic",
python_callable=step2.setupkafkatopics,
provide_context=True,
)
# STEP 3: Produce data to topic
sensor_C = PythonOperator(
task_id="step_3_solution_task_producetotopic",
python_callable=step3.startproducing,
provide_context=True,
)
# STEP 4: Preprocess the data
sensor_D = PythonOperator(
task_id="step_4_solution_task_preprocess",
python_callable=step4.dopreprocessing,
provide_context=True,
)
# STEP 7: Containerize the solution
sensor_E = PythonOperator(
task_id="step_7_solution_task_visualization",
python_callable=step7.startstreamingengine,
provide_context=True,
)
# STEP 8: Containerize the solution
sensor_F = PythonOperator(
task_id="step_8_solution_task_containerize",
python_callable=step8.dockerit,
provide_context=True,
op_args=['ai'],
)
# STEP 9: PrivateGPT
sensor_I = PythonOperator(
task_id="step_9_solution_task_ai",
python_callable=step9.startprivategpt,
provide_context=True,
)
start_task2 = BashOperator(
task_id="Starting_Docker",
bash_command="echo 'Start task Completed'",
)
start_task3 = BashOperator(
task_id="Starting_Documentation",
bash_command="echo 'Start task Completed'",
)
start_task4 = BashOperator(
task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
bash_command="echo 'Start task Completed'",
)
# STEP 10: Document the solution
sensor_G = PythonOperator(
task_id="step_10_solution_task_document",
python_callable=step10.generatedoc,
provide_context=True,
)
start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_I, sensor_C, sensor_D, sensor_E] >> start_task2 >> sensor_F >> start_task3 >> sensor_G
5.4. 2. Solution Template: solution_template_processing_ai_dag_mqtt.py
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
ExternalPythonOperator,
PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_MQTT_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")
with DAG(
dag_id="solution_preprocessing_ai_mqtt_dag",
start_date=datetime(2023, 1, 1),
schedule=None,
) as dag:
start_task = BashOperator(
task_id="start_tasks_tml_preprocessing_ai_mqtt",
bash_command="echo 'Start task'",
)
# STEP 1: Get the Parameters
sensor_A = PythonOperator(
task_id="step_1_solution_task_getparams",
python_callable=step1.getparams,
provide_context=True,
)
# STEP 2: Create the Kafka topics
sensor_B = PythonOperator(
task_id="step_2_solution_task_createtopic",
python_callable=step2.setupkafkatopics,
provide_context=True,
)
# STEP 3: Produce data to topic
sensor_C = PythonOperator(
task_id="step_3_solution_task_producetotopic",
python_callable=step3.startproducing,
provide_context=True,
)
# STEP 4: Preprocess the data
sensor_D = PythonOperator(
task_id="step_4_solution_task_preprocess",
python_callable=step4.dopreprocessing,
provide_context=True,
)
# STEP 7: Containerize the solution
sensor_E = PythonOperator(
task_id="step_7_solution_task_visualization",
python_callable=step7.startstreamingengine,
provide_context=True,
)
# STEP 8: Containerize the solution
sensor_F = PythonOperator(
task_id="step_8_solution_task_containerize",
python_callable=step8.dockerit,
provide_context=True,
)
# STEP 9: PrivateGPT
sensor_I = PythonOperator(
task_id="step_9_solution_task_ai",
python_callable=step9.startprivategpt,
provide_context=True,
)
start_task2 = BashOperator(
task_id="Starting_Docker",
bash_command="echo 'Start task Completed'",
)
start_task3 = BashOperator(
task_id="Starting_Documentation",
bash_command="echo 'Start task Completed'",
)
start_task4 = BashOperator(
task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
bash_command="echo 'Start task Completed'",
)
# STEP 10: Document the solution
sensor_G = PythonOperator(
task_id="step_10_solution_task_document",
python_callable=step10.generatedoc,
provide_context=True,
)
start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_I, sensor_C, sensor_D, sensor_E] >> start_task2 >> sensor_F >> start_task3 >> sensor_G
5.5. 3. Solution Template: solution_template_processing_ai_dag_restapi.py
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
ExternalPythonOperator,
PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_RESTAPI_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")
with DAG(
dag_id="solution_preprocessing_ai_restapi_dag",
start_date=datetime(2023, 1, 1),
schedule=None,
) as dag:
start_task = BashOperator(
task_id="start_tasks_tml_preprocessing_ai_restapi",
bash_command="echo 'Start task'",
)
# STEP 1: Get the Parameters
sensor_A = PythonOperator(
task_id="step_1_solution_task_getparams",
python_callable=step1.getparams,
provide_context=True,
)
# STEP 2: Create the Kafka topics
sensor_B = PythonOperator(
task_id="step_2_solution_task_createtopic",
python_callable=step2.setupkafkatopics,
provide_context=True,
)
# STEP 3: Produce data to topic
sensor_C = PythonOperator(
task_id="step_3_solution_task_producetotopic",
python_callable=step3.startproducing,
provide_context=True,
)
# STEP 4: Preprocess the data
sensor_D = PythonOperator(
task_id="step_4_solution_task_preprocess",
python_callable=step4.dopreprocessing,
provide_context=True,
)
# STEP 7: Containerize the solution
sensor_E = PythonOperator(
task_id="step_7_solution_task_visualization",
python_callable=step7.startstreamingengine,
provide_context=True,
)
# STEP 8: Containerize the solution
sensor_F = PythonOperator(
task_id="step_8_solution_task_containerize",
python_callable=step8.dockerit,
provide_context=True,
)
# STEP 9: PrivateGPT
sensor_I = PythonOperator(
task_id="step_9_solution_task_ai",
python_callable=step9.startprivategpt,
provide_context=True,
)
start_task2 = BashOperator(
task_id="Starting_Docker",
bash_command="echo 'Start task Completed'",
)
start_task3 = BashOperator(
task_id="Starting_Documentation",
bash_command="echo 'Start task Completed'",
)
start_task4 = BashOperator(
task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
bash_command="echo 'Start task Completed'",
)
# STEP 10: Document the solution
sensor_G = PythonOperator(
task_id="step_10_solution_task_document",
python_callable=step10.generatedoc,
provide_context=True,
)
start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_I, sensor_C, sensor_D, sensor_E] >> start_task2 >> sensor_F >> start_task3 >> sensor_G
5.6. 4. Solution Template: solution_template_processing_ai_dag.py
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
ExternalPythonOperator,
PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_LOCALFILE_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")
with DAG(
dag_id="solution_preprocessing_ai_dag",
start_date=datetime(2023, 1, 1),
schedule=None,
) as dag:
start_task = BashOperator(
task_id="start_tasks_tml_preprocessing_ai",
bash_command="echo 'Start task'",
)
# STEP 1: Get the Parameters
sensor_A = PythonOperator(
task_id="step_1_solution_task_getparams",
python_callable=step1.getparams,
provide_context=True,
)
# STEP 2: Create the Kafka topics
sensor_B = PythonOperator(
task_id="step_2_solution_task_createtopic",
python_callable=step2.setupkafkatopics,
provide_context=True,
)
# STEP 3: Produce data to topic
sensor_C = PythonOperator(
task_id="step_3_solution_task_producetotopic",
python_callable=step3.startproducing,
provide_context=True,
)
# STEP 4: Preprocess the data
sensor_D = PythonOperator(
task_id="step_4_solution_task_preprocess",
python_callable=step4.dopreprocessing,
provide_context=True,
)
# STEP 7: Containerize the solution
sensor_E = PythonOperator(
task_id="step_7_solution_task_visualization",
python_callable=step7.startstreamingengine,
provide_context=True,
)
# STEP 8: Containerize the solution
sensor_F = PythonOperator(
task_id="step_8_solution_task_containerize",
python_callable=step8.dockerit,
provide_context=True,
)
# STEP 9: PrivateGPT
sensor_I = PythonOperator(
task_id="step_9_solution_task_ai",
python_callable=step9.startprivategpt,
provide_context=True,
)
start_task2 = BashOperator(
task_id="Starting_Docker",
bash_command="echo 'Start task Completed'",
)
start_task3 = BashOperator(
task_id="Starting_Documentation",
bash_command="echo 'Start task Completed'",
)
start_task4 = BashOperator(
task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
bash_command="echo 'Start task Completed'",
)
# STEP 10: Document the solution
sensor_G = PythonOperator(
task_id="step_10_solution_task_document",
python_callable=step10.generatedoc,
provide_context=True,
)
start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_I, sensor_C, sensor_D, sensor_E] >> start_task2 >> sensor_F >> start_task3 >> sensor_G
5.7. 5. Solution Template: solution_template_processing_dag_grpc.py
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
ExternalPythonOperator,
PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_gRPC_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")
with DAG(
dag_id="solution_preprocessing_dag_grpc",
start_date=datetime(2023, 1, 1),
schedule=None,
) as dag:
start_task = BashOperator(
task_id="start_tasks_tml_preprocessing_grpc",
bash_command="echo 'Start task'",
)
# STEP 1: Get the Parameters
sensor_A = PythonOperator(
task_id="step_1_solution_task_getparams",
python_callable=step1.getparams,
provide_context=True,
)
# STEP 2: Create the Kafka topics
sensor_B = PythonOperator(
task_id="step_2_solution_task_createtopic",
python_callable=step2.setupkafkatopics,
provide_context=True,
)
# STEP 3: Produce data to topic
sensor_C = PythonOperator(
task_id="step_3_solution_task_producetotopic",
python_callable=step3.startproducing,
provide_context=True,
)
# STEP 4: Preprocess the data
sensor_D = PythonOperator(
task_id="step_4_solution_task_preprocess",
python_callable=step4.dopreprocessing,
provide_context=True,
)
# STEP 7: Containerize the solution
sensor_E = PythonOperator(
task_id="step_7_solution_task_visualization",
python_callable=step7.startstreamingengine,
provide_context=True,
)
# STEP 8: Containerize the solution
sensor_F = PythonOperator(
task_id="step_8_solution_task_containerize",
python_callable=step8.dockerit,
provide_context=True,
)
start_task2 = BashOperator(
task_id="Starting_Docker",
bash_command="echo 'Start task Completed'",
)
start_task3 = BashOperator(
task_id="Starting_Documentation",
bash_command="echo 'Start task Completed'",
)
start_task4 = BashOperator(
task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
bash_command="echo 'Start task Completed'",
)
# STEP 10: Document the solution
sensor_G = PythonOperator(
task_id="step_10_solution_task_document",
python_callable=step10.generatedoc,
provide_context=True,
)
start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E] >> start_task2 >> sensor_F >> start_task3 >> sensor_G
5.8. 6. Solution Template: solution_template_processing_dag_mqtt.py
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
ExternalPythonOperator,
PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_MQTT_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")
with DAG(
dag_id="solution_preprocessing_dag_mqtt",
start_date=datetime(2023, 1, 1),
schedule=None,
) as dag:
start_task = BashOperator(
task_id="start_tasks_tml_preprocessing_mqtt",
bash_command="echo 'Start task'",
)
# STEP 1: Get the Parameters
sensor_A = PythonOperator(
task_id="step_1_solution_task_getparams",
python_callable=step1.getparams,
provide_context=True,
)
# STEP 2: Create the Kafka topics
sensor_B = PythonOperator(
task_id="step_2_solution_task_createtopic",
python_callable=step2.setupkafkatopics,
provide_context=True,
)
# STEP 3: Produce data to topic
sensor_C = PythonOperator(
task_id="step_3_solution_task_producetotopic",
python_callable=step3.startproducing,
provide_context=True,
)
# STEP 4: Preprocess the data
sensor_D = PythonOperator(
task_id="step_4_solution_task_preprocess",
python_callable=step4.dopreprocessing,
provide_context=True,
)
# STEP 7: Containerize the solution
sensor_E = PythonOperator(
task_id="step_7_solution_task_visualization",
python_callable=step7.startstreamingengine,
provide_context=True,
)
# STEP 8: Containerize the solution
sensor_F = PythonOperator(
task_id="step_8_solution_task_containerize",
python_callable=step8.dockerit,
provide_context=True,
)
start_task2 = BashOperator(
task_id="Starting_Docker",
bash_command="echo 'Start task Completed'",
)
start_task3 = BashOperator(
task_id="Starting_Documentation",
bash_command="echo 'Start task Completed'",
)
start_task4 = BashOperator(
task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
bash_command="echo 'Start task Completed'",
)
# STEP 10: Document the solution
sensor_G = PythonOperator(
task_id="step_10_solution_task_document",
python_callable=step10.generatedoc,
provide_context=True,
)
start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E] >> start_task2 >> sensor_F >> start_task3 >> sensor_G
5.9. 7. Solution Template: solution_template_processing_dag_restapi.py
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
ExternalPythonOperator,
PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_RESTAPI_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")
with DAG(
dag_id="solution_preprocessing_dag_restapi",
start_date=datetime(2023, 1, 1),
schedule=None,
) as dag:
start_task = BashOperator(
task_id="start_tasks_tml_preprocessing_restapi",
bash_command="echo 'Start task'",
)
# STEP 1: Get the Parameters
sensor_A = PythonOperator(
task_id="step_1_solution_task_getparams",
python_callable=step1.getparams,
provide_context=True,
)
# STEP 2: Create the Kafka topics
sensor_B = PythonOperator(
task_id="step_2_solution_task_createtopic",
python_callable=step2.setupkafkatopics,
provide_context=True,
)
# STEP 3: Produce data to topic
sensor_C = PythonOperator(
task_id="step_3_solution_task_producetotopic",
python_callable=step3.startproducing,
provide_context=True,
)
# STEP 4: Preprocess the data
sensor_D = PythonOperator(
task_id="step_4_solution_task_preprocess",
python_callable=step4.dopreprocessing,
provide_context=True,
)
# STEP 7: Containerize the solution
sensor_E = PythonOperator(
task_id="step_7_solution_task_visualization",
python_callable=step7.startstreamingengine,
provide_context=True,
)
# STEP 8: Containerize the solution
sensor_F = PythonOperator(
task_id="step_8_solution_task_containerize",
python_callable=step8.dockerit,
provide_context=True,
)
start_task2 = BashOperator(
task_id="Starting_Docker",
bash_command="echo 'Start task Completed'",
)
start_task3 = BashOperator(
task_id="Starting_Documentation",
bash_command="echo 'Start task Completed'",
)
start_task4 = BashOperator(
task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
bash_command="echo 'Start task Completed'",
)
# STEP 10: Document the solution
sensor_G = PythonOperator(
task_id="step_10_solution_task_document",
python_callable=step10.generatedoc,
provide_context=True,
)
start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E] >> start_task2 >> sensor_F >> start_task3 >> sensor_G
5.10. 8. Solution Template: solution_template_processing_dag.py
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
ExternalPythonOperator,
PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_LOCALFILE_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")
with DAG(
dag_id="solution_preprocessing_dag",
start_date=datetime(2023, 1, 1),
schedule=None,
) as dag:
start_task = BashOperator(
task_id="start_tasks_tml_preprocessing",
bash_command="echo 'Start task'",
)
# STEP 1: Get the Parameters
sensor_A = PythonOperator(
task_id="step_1_solution_task_getparams",
python_callable=step1.getparams,
provide_context=True,
)
# STEP 2: Create the Kafka topics
sensor_B = PythonOperator(
task_id="step_2_solution_task_createtopic",
python_callable=step2.setupkafkatopics,
provide_context=True,
)
# STEP 3: Produce data to topic
sensor_C = PythonOperator(
task_id="step_3_solution_task_producetotopic",
python_callable=step3.startproducing,
provide_context=True,
)
# STEP 4: Preprocess the data
sensor_D = PythonOperator(
task_id="step_4_solution_task_preprocess",
python_callable=step4.dopreprocessing,
provide_context=True,
)
# STEP 7: Containerize the solution
sensor_E = PythonOperator(
task_id="step_7_solution_task_visualization",
python_callable=step7.startstreamingengine,
provide_context=True,
)
# STEP 8: Containerize the solution
sensor_F = PythonOperator(
task_id="step_8_solution_task_containerize",
python_callable=step8.dockerit,
provide_context=True,
)
start_task2 = BashOperator(
task_id="Starting_Docker",
bash_command="echo 'Start task Completed'",
)
start_task3 = BashOperator(
task_id="Starting_Documentation",
bash_command="echo 'Start task Completed'",
)
start_task4 = BashOperator(
task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
bash_command="echo 'Start task Completed'",
)
# STEP 10: Document the solution
sensor_G = PythonOperator(
task_id="step_10_solution_task_document",
python_callable=step10.generatedoc,
provide_context=True,
)
start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E] >> start_task2 >> sensor_F >> start_task3 >> sensor_G
5.11. 9. Solution Template: solution_template_processing_ml_ai_dag_grpc.py
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
ExternalPythonOperator,
PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_gRPC_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")
with DAG(
dag_id="solution_preprocessing_ml_ai_grpc_dag",
start_date=datetime(2023, 1, 1),
schedule=None,
) as dag:
start_task = BashOperator(
task_id="start_tasks_tml_preprocessing_ml_ai",
bash_command="echo 'Start task'",
)
# STEP 1: Get the Parameters
sensor_A = PythonOperator(
task_id="step_1_solution_task_getparams",
python_callable=step1.getparams,
provide_context=True,
)
# STEP 2: Create the Kafka topics
sensor_B = PythonOperator(
task_id="step_2_solution_task_createtopic",
python_callable=step2.setupkafkatopics,
provide_context=True,
)
# STEP 3: Produce data to topic
sensor_C = PythonOperator(
task_id="step_3_solution_task_producetotopic",
python_callable=step3.startproducing,
provide_context=True,
)
# STEP 4: Preprocess the data
sensor_D = PythonOperator(
task_id="step_4_solution_task_preprocess",
python_callable=step4.dopreprocessing,
provide_context=True,
)
# STEP 5: ML
sensor_E = PythonOperator(
task_id="step_5_solution_task_ml",
python_callable=step5.startml,
provide_context=True,
)
# STEP 6: Predictions
sensor_F = PythonOperator(
task_id="step_6_solution_task_prediction",
python_callable=step6.startpredictions,
provide_context=True,
)
# STEP 7: Visualization the solution
sensor_G = PythonOperator(
task_id="step_7_solution_task_visualization",
python_callable=step7.startstreamingengine,
provide_context=True,
)
# STEP 8: Containerize the solution
sensor_H = PythonOperator(
task_id="step_8_solution_task_containerize",
python_callable=step8.dockerit,
provide_context=True,
)
# STEP 9: PrivateGPT
sensor_I = PythonOperator(
task_id="step_9_solution_task_ai",
python_callable=step9.startprivategpt,
provide_context=True,
)
start_task2 = BashOperator(
task_id="Starting_Docker",
bash_command="echo 'Start task Completed'",
)
start_task3 = BashOperator(
task_id="Starting_Documentation",
bash_command="echo 'Start task Completed'",
)
start_task4 = BashOperator(
task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
bash_command="echo 'Start task Completed'",
)
# STEP 10: Document the solution
sensor_J = PythonOperator(
task_id="step_10_solution_task_document",
python_callable=step10.generatedoc,
provide_context=True,
)
start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_I, sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J
5.12. 10. Solution Template: solution_template_processing_ml_ai_dag_mqtt.py
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
ExternalPythonOperator,
PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_MQTT_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")
with DAG(
dag_id="solution_preprocessing_ml_ai_mqtt_dag",
start_date=datetime(2023, 1, 1),
schedule=None,
) as dag:
start_task = BashOperator(
task_id="start_tasks_tml_preprocessing_ml_ai",
bash_command="echo 'Start task'",
)
# STEP 1: Get the Parameters
sensor_A = PythonOperator(
task_id="step_1_solution_task_getparams",
python_callable=step1.getparams,
provide_context=True,
)
# STEP 2: Create the Kafka topics
sensor_B = PythonOperator(
task_id="step_2_solution_task_createtopic",
python_callable=step2.setupkafkatopics,
provide_context=True,
)
# STEP 3: Produce data to topic
sensor_C = PythonOperator(
task_id="step_3_solution_task_producetotopic",
python_callable=step3.startproducing,
provide_context=True,
)
# STEP 4: Preprocess the data
sensor_D = PythonOperator(
task_id="step_4_solution_task_preprocess",
python_callable=step4.dopreprocessing,
provide_context=True,
)
# STEP 5: ML
sensor_E = PythonOperator(
task_id="step_5_solution_task_ml",
python_callable=step5.startml,
provide_context=True,
)
# STEP 6: Predictions
sensor_F = PythonOperator(
task_id="step_6_solution_task_prediction",
python_callable=step6.startpredictions,
provide_context=True,
)
# STEP 7: Visualization the solution
sensor_G = PythonOperator(
task_id="step_7_solution_task_visualization",
python_callable=step7.startstreamingengine,
provide_context=True,
)
# STEP 8: Containerize the solution
sensor_H = PythonOperator(
task_id="step_8_solution_task_containerize",
python_callable=step8.dockerit,
provide_context=True,
)
# STEP 9: PrivateGPT
sensor_I = PythonOperator(
task_id="step_9_solution_task_ai",
python_callable=step9.startprivategpt,
provide_context=True,
)
start_task2 = BashOperator(
task_id="Starting_Docker",
bash_command="echo 'Start task Completed'",
)
start_task3 = BashOperator(
task_id="Starting_Documentation",
bash_command="echo 'Start task Completed'",
)
start_task4 = BashOperator(
task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
bash_command="echo 'Start task Completed'",
)
# STEP 10: Document the solution
sensor_J = PythonOperator(
task_id="step_10_solution_task_document",
python_callable=step10.generatedoc,
provide_context=True,
)
start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_I, sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J
5.13. 11. Solution Template: solution_template_processing_ml_ai_dag_restapi.py
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
ExternalPythonOperator,
PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_RESTAPI_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")
with DAG(
dag_id="solution_preprocessing_ml_ai_restapi_dag",
start_date=datetime(2023, 1, 1),
schedule=None,
) as dag:
start_task = BashOperator(
task_id="start_tasks_tml_preprocessing_ml_ai",
bash_command="echo 'Start task'",
)
# STEP 1: Get the Parameters
sensor_A = PythonOperator(
task_id="step_1_solution_task_getparams",
python_callable=step1.getparams,
provide_context=True,
)
# STEP 2: Create the Kafka topics
sensor_B = PythonOperator(
task_id="step_2_solution_task_createtopic",
python_callable=step2.setupkafkatopics,
provide_context=True,
)
# STEP 3: Produce data to topic
sensor_C = PythonOperator(
task_id="step_3_solution_task_producetotopic",
python_callable=step3.startproducing,
provide_context=True,
)
# STEP 4: Preprocess the data
sensor_D = PythonOperator(
task_id="step_4_solution_task_preprocess",
python_callable=step4.dopreprocessing,
provide_context=True,
)
# STEP 5: ML
sensor_E = PythonOperator(
task_id="step_5_solution_task_ml",
python_callable=step5.startml,
provide_context=True,
)
# STEP 6: Predictions
sensor_F = PythonOperator(
task_id="step_6_solution_task_prediction",
python_callable=step6.startpredictions,
provide_context=True,
)
# STEP 7: Visualization the solution
sensor_G = PythonOperator(
task_id="step_7_solution_task_visualization",
python_callable=step7.startstreamingengine,
provide_context=True,
)
# STEP 8: Containerize the solution
sensor_H = PythonOperator(
task_id="step_8_solution_task_containerize",
python_callable=step8.dockerit,
provide_context=True,
)
# STEP 9: PrivateGPT
sensor_I = PythonOperator(
task_id="step_9_solution_task_ai",
python_callable=step9.startprivategpt,
provide_context=True,
)
start_task2 = BashOperator(
task_id="Starting_Docker",
bash_command="echo 'Start task Completed'",
)
start_task3 = BashOperator(
task_id="Starting_Documentation",
bash_command="echo 'Start task Completed'",
)
start_task4 = BashOperator(
task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
bash_command="echo 'Start task Completed'",
)
# STEP 10: Document the solution
sensor_J = PythonOperator(
task_id="step_10_solution_task_document",
python_callable=step10.generatedoc,
provide_context=True,
)
start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_I, sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J
5.14. 12. Solution Template: solution_template_processing_ml_ai_dag.py
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
ExternalPythonOperator,
PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_LOCALFILE_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")
with DAG(
dag_id="solution_preprocessing_ml_ai_dag",
start_date=datetime(2023, 1, 1),
schedule=None,
) as dag:
start_task = BashOperator(
task_id="start_tasks_tml_preprocessing",
bash_command="echo 'Start task'",
)
# STEP 1: Get the Parameters
sensor_A = PythonOperator(
task_id="step_1_solution_task_getparams",
python_callable=step1.getparams,
provide_context=True,
)
# STEP 2: Create the Kafka topics
sensor_B = PythonOperator(
task_id="step_2_solution_task_createtopic",
python_callable=step2.setupkafkatopics,
provide_context=True,
)
# STEP 3: Produce data to topic
sensor_C = PythonOperator(
task_id="step_3_solution_task_producetotopic",
python_callable=step3.startproducing,
provide_context=True,
)
# STEP 4: Preprocess the data
sensor_D = PythonOperator(
task_id="step_4_solution_task_preprocess",
python_callable=step4.dopreprocessing,
provide_context=True,
)
# STEP 5: ML
sensor_E = PythonOperator(
task_id="step_5_solution_task_ml",
python_callable=step5.startml,
provide_context=True,
)
# STEP 6: Predictions
sensor_F = PythonOperator(
task_id="step_6_solution_task_prediction",
python_callable=step6.startpredictions,
provide_context=True,
)
# STEP 7: Visualization the solution
sensor_G = PythonOperator(
task_id="step_7_solution_task_visualization",
python_callable=step7.startstreamingengine,
provide_context=True,
)
# STEP 8: Containerize the solution
sensor_H = PythonOperator(
task_id="step_8_solution_task_containerize",
python_callable=step8.dockerit,
provide_context=True,
)
# STEP 9: PrivateGPT
sensor_I = PythonOperator(
task_id="step_9_solution_task_ai",
python_callable=step9.startprivategpt,
provide_context=True,
)
start_task2 = BashOperator(
task_id="Starting_Docker",
bash_command="echo 'Start task Completed'",
)
start_task3 = BashOperator(
task_id="Starting_Documentation",
bash_command="echo 'Start task Completed'",
)
start_task4 = BashOperator(
task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
bash_command="echo 'Start task Completed'",
)
# STEP 10: Document the solution
sensor_J = PythonOperator(
task_id="step_10_solution_task_document",
python_callable=step10.generatedoc,
provide_context=True,
)
start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_I, sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J
5.15. 13. Solution Template: solution_template_processing_ml_dag_grpc.py
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
ExternalPythonOperator,
PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_gRPC_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")
with DAG(
dag_id="solution_preprocessing_ml_grpc_dag",
start_date=datetime(2023, 1, 1),
schedule=None,
) as dag:
start_task = BashOperator(
task_id="start_tasks_tml_preprocessing_ml_grpc",
bash_command="echo 'Start task'",
)
# STEP 1: Get the Parameters
sensor_A = PythonOperator(
task_id="step_1_solution_task_getparams",
python_callable=step1.getparams,
provide_context=True,
)
# STEP 2: Create the Kafka topics
sensor_B = PythonOperator(
task_id="step_2_solution_task_createtopic",
python_callable=step2.setupkafkatopics,
provide_context=True,
)
# STEP 3: Produce data to topic
sensor_C = PythonOperator(
task_id="step_3_solution_task_producetotopic",
python_callable=step3.startproducing,
provide_context=True,
)
# STEP 4: Preprocess the data
sensor_D = PythonOperator(
task_id="step_4_solution_task_preprocess",
python_callable=step4.dopreprocessing,
provide_context=True,
)
# STEP 5: ML
sensor_E = PythonOperator(
task_id="step_5_solution_task_ml",
python_callable=step5.startml,
provide_context=True,
)
# STEP 6: Predictions
sensor_F = PythonOperator(
task_id="step_6_solution_task_prediction",
python_callable=step6.startpredictions,
provide_context=True,
)
# STEP 7: Visualization the solution
sensor_G = PythonOperator(
task_id="step_7_solution_task_visualization",
python_callable=step7.startstreamingengine,
provide_context=True,
)
- # STEP 8: Containerize the solution
- sensor_H = PythonOperator(
task_id=”step_8_solution_task_containerize”, python_callable=step8.dockerit, provide_context=True,
) start_task2 = BashOperator(
task_id=”Starting_Docker”, bash_command=”echo ‘Start task Completed’”,
) start_task3 = BashOperator(
task_id=”Starting_Documentation”, bash_command=”echo ‘Start task Completed’”,
) start_task4 = BashOperator(
task_id=”Completed_TML_Setup_Now_Spawn_Main_Processes”, bash_command=”echo ‘Start task Completed’”,
)
- # STEP 10: Document the solution
- sensor_J = PythonOperator(
task_id=”step_10_solution_task_document”, python_callable=step10.generatedoc, provide_context=True,
)
start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J
5.16. 14. Solution Template: solution_template_processing_ml_dag_mqtt.py
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
ExternalPythonOperator,
PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_MQTT_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")
with DAG(
dag_id="solution_preprocessing_ml_mqtt_dag",
start_date=datetime(2023, 1, 1),
schedule=None,
) as dag:
start_task = BashOperator(
task_id="start_tasks_tml_preprocessing_ml_mqtt",
bash_command="echo 'Start task'",
)
# STEP 1: Get the Parameters
sensor_A = PythonOperator(
task_id="step_1_solution_task_getparams",
python_callable=step1.getparams,
provide_context=True,
)
# STEP 2: Create the Kafka topics
sensor_B = PythonOperator(
task_id="step_2_solution_task_createtopic",
python_callable=step2.setupkafkatopics,
provide_context=True,
)
# STEP 3: Produce data to topic
sensor_C = PythonOperator(
task_id="step_3_solution_task_producetotopic",
python_callable=step3.startproducing,
provide_context=True,
)
# STEP 4: Preprocess the data
sensor_D = PythonOperator(
task_id="step_4_solution_task_preprocess",
python_callable=step4.dopreprocessing,
provide_context=True,
)
# STEP 5: ML
sensor_E = PythonOperator(
task_id="step_5_solution_task_ml",
python_callable=step5.startml,
provide_context=True,
)
# STEP 6: Predictions
sensor_F = PythonOperator(
task_id="step_6_solution_task_prediction",
python_callable=step6.startpredictions,
provide_context=True,
)
# STEP 7: Visualization the solution
sensor_G = PythonOperator(
task_id="step_7_solution_task_visualization",
python_callable=step7.startstreamingengine,
provide_context=True,
)
# STEP 8: Containerize the solution
sensor_H = PythonOperator(
task_id="step_8_solution_task_containerize",
python_callable=step8.dockerit,
provide_context=True,
)
start_task2 = BashOperator(
task_id="Starting_Docker",
bash_command="echo 'Start task Completed'",
)
start_task3 = BashOperator(
task_id="Starting_Documentation",
bash_command="echo 'Start task Completed'",
)
start_task4 = BashOperator(
task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
bash_command="echo 'Start task Completed'",
)
# STEP 10: Document the solution
sensor_J = PythonOperator(
task_id="step_10_solution_task_document",
python_callable=step10.generatedoc,
provide_context=True,
)
start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J
5.17. 15. Solution Template: solution_template_processing_ml_dag_restapi.py
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
ExternalPythonOperator,
PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_RESTAPI_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")
with DAG(
dag_id="solution_preprocessing_ml_restapi_dag",
start_date=datetime(2023, 1, 1),
schedule=None,
) as dag:
start_task = BashOperator(
task_id="start_tasks_tml_preprocessing_ml_restapi",
bash_command="echo 'Start task'",
)
# STEP 1: Get the Parameters
sensor_A = PythonOperator(
task_id="step_1_solution_task_getparams",
python_callable=step1.getparams,
provide_context=True,
)
# STEP 2: Create the Kafka topics
sensor_B = PythonOperator(
task_id="step_2_solution_task_createtopic",
python_callable=step2.setupkafkatopics,
provide_context=True,
)
# STEP 3: Produce data to topic
sensor_C = PythonOperator(
task_id="step_3_solution_task_producetotopic",
python_callable=step3.startproducing,
provide_context=True,
)
# STEP 4: Preprocess the data
sensor_D = PythonOperator(
task_id="step_4_solution_task_preprocess",
python_callable=step4.dopreprocessing,
provide_context=True,
)
# STEP 5: ML
sensor_E = PythonOperator(
task_id="step_5_solution_task_ml",
python_callable=step5.startml,
provide_context=True,
)
# STEP 6: Predictions
sensor_F = PythonOperator(
task_id="step_6_solution_task_prediction",
python_callable=step6.startpredictions,
provide_context=True,
)
# STEP 7: Visualization the solution
sensor_G = PythonOperator(
task_id="step_7_solution_task_visualization",
python_callable=step7.startstreamingengine,
provide_context=True,
)
# STEP 8: Containerize the solution
sensor_H = PythonOperator(
task_id="step_8_solution_task_containerize",
python_callable=step8.dockerit,
provide_context=True,
)
start_task2 = BashOperator(
task_id="Starting_Docker",
bash_command="echo 'Start task Completed'",
)
start_task3 = BashOperator(
task_id="Starting_Documentation",
bash_command="echo 'Start task Completed'",
)
start_task4 = BashOperator(
task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
bash_command="echo 'Start task Completed'",
)
# STEP 10: Document the solution
sensor_J = PythonOperator(
task_id="step_10_solution_task_document",
python_callable=step10.generatedoc,
provide_context=True,
)
start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J
5.18. 16. Solution Template: solution_template_processing_ml_dag.py
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
ExternalPythonOperator,
PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_LOCALFILE_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")
with DAG(
dag_id="solution_preprocessing_ml_dag",
start_date=datetime(2023, 1, 1),
schedule=None,
) as dag:
start_task = BashOperator(
task_id="start_tasks_tml_preprocessing_ml",
bash_command="echo 'Start task'",
)
# STEP 1: Get the Parameters
sensor_A = PythonOperator(
task_id="step_1_solution_task_getparams",
python_callable=step1.getparams,
provide_context=True,
)
# STEP 2: Create the Kafka topics
sensor_B = PythonOperator(
task_id="step_2_solution_task_createtopic",
python_callable=step2.setupkafkatopics,
provide_context=True,
)
# STEP 3: Produce data to topic
sensor_C = PythonOperator(
task_id="step_3_solution_task_producetotopic",
python_callable=step3.startproducing,
provide_context=True,
)
# STEP 4: Preprocess the data
sensor_D = PythonOperator(
task_id="step_4_solution_task_preprocess",
python_callable=step4.dopreprocessing,
provide_context=True,
)
# STEP 5: ML
sensor_E = PythonOperator(
task_id="step_5_solution_task_ml",
python_callable=step5.startml,
provide_context=True,
)
# STEP 6: Predictions
sensor_F = PythonOperator(
task_id="step_6_solution_task_prediction",
python_callable=step6.startpredictions,
provide_context=True,
)
# STEP 7: Visualization the solution
sensor_G = PythonOperator(
task_id="step_7_solution_task_visualization",
python_callable=step7.startstreamingengine,
provide_context=True,
)
# STEP 8: Containerize the solution
sensor_H = PythonOperator(
task_id="step_8_solution_task_containerize",
python_callable=step8.dockerit,
provide_context=True,
)
start_task2 = BashOperator(
task_id="Starting_Docker",
bash_command="echo 'Start task Completed'",
)
start_task3 = BashOperator(
task_id="Starting_Documentation",
bash_command="echo 'Start task Completed'",
)
start_task4 = BashOperator(
task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
bash_command="echo 'Start task Completed'",
)
# STEP 10: Document the solution
sensor_J = PythonOperator(
task_id="step_10_solution_task_document",
python_callable=step10.generatedoc,
provide_context=True,
)
start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J
5.19. How To Read a Solution Template
5.19.1. 1. import files
Consider the following import files:
step1 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_1_getparams_dag-myawesometmlsolutionml-3f10”)
step2 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_2_kafka_createtopic_dag-myawesometmlsolutionml-3f10”)
step3 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_read_LOCALFILE_step_3_kafka_producetotopic_dag-myawesometmlsolutionml-3f10”)
step4 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_4_kafka_preprocess_dag-myawesometmlsolutionml-3f10”)
step5 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_5_kafka_machine_learning_dag-myawesometmlsolutionml-3f10”)
step6 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_6_kafka_predictions_dag-myawesometmlsolutionml-3f10”)
step7 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_7_kafka_visualization_dag-myawesometmlsolutionml-3f10”)
step8 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_8_deploy_solution_to_docker_dag-myawesometmlsolutionml-3f10”)
step9 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_9_privategpt_qdrant_dag-myawesometmlsolutionml-3f10”)
step10 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_10_documentation_dag-myawesometmlsolutionml-3f10”)
Each of these steps holds the code definition of the DAG. For example:
step1 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_1_getparams_dag-myawesometmlsolutionml-3f10”)
tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_1_getparams_dag-myawesometmlsolutionml-3f10 is a physical python file located in the folder:
<your repo>/tml-airflow/dags/tml-solutions/myawesometmlsolutionml-3f10/tml_system_step_1_getparams_dag-myawesometmlsolutionml-3f10.py
For this example it is here: https://github.com/smaurice101/raspberrypi/blob/main/tml-airflow/dags/tml-solutions/myawesometmlsolutionml-3f10/tml_system_step_1_getparams_dag-myawesometmlsolutionml-3f10.py
5.19.2. 2. tasks to execute
Keeping with the above example. We can create the task for Step 1:
- STEP 1: Get the Parameters
- sensor_A = PythonOperator(
task_id=”step_1_solution_task_getparams”, python_callable=step1.getparams, provide_context=True,
)
python_callable=step1.getparams,
Notice step1.getparams, this will now call the function getparams in the file https://github.com/smaurice101/raspberrypi/blob/main/tml- airflow/dags/tml-solutions/myawesometmlsolutionml-3f10/tml_system_step_1_getparams_dag-myawesometmlsolutionml-3f10.py
And assign the operation to sensor_A - similarly we do this with other tasks.
5.19.3. 3. tasks groupings
We can now group the tasks into a complete end-end TML solution:
start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J
Note
These tasks will run sequentially. The tasks in the square brackets, [sensor_C, sensor_D, sensor_E, sensor_F, sensor_G], will run in parallel. The entire process is shown in the graph below.
5.20. Running A Solution Container
Follow the instructions here Lets Start Building a TML Solution