5. TML Solution Templates

TML solution templates are designed to dramatically accelerate TML solution solution in a few minutes, when it would normally take companies weeks or months to build end-end real-time solutions at scale, with integrations with advanced machine learning, GenAI, automated docker container deployments and automated documentation, with automated Github code commits.

Solution templated require NO CODE or Configurations. Just RUN THEM in TSS! An example will be shown to make this easy to understand here Running A Solution Container

Important

ALL TML SOLUTIONS MUST BE DEVELOPED USING “COPIES” OF THESE SOLUTION TEMPLATES IN YOUR TML PROJECT FOLDER. Copies of these solution templates are automatically made for you when you create a TML project: Refer to here for details Lets Start Building a TML Solution

These solution templates execute all of the TML DAGs here DAG Solution Process Explanation

All you do is configure the parameters in the TML DAGs and RUN the Solution Templates. THAT IS IT!

5.1. The Solution Template Naming Conventions

The namees of the solution template tell you what the solution is about. Each solution template performs functions:

  1. processing - using ingested data from: local file, MQTT, gRPC, REST

  2. ML/predictions - using ingested data from: local file, MQTT, gRPC, REST

  3. AI - using ingested data from: local file, MQTT, gRPC, REST

5.1.1. For example:

1. solution_preprocessing_dag: Is doing ONLY preprocessing using LOCAL FILE because it does not use MQTT, REST, gRPC and defaults to local file

2. solution_preprocessing_dag_mqtt: Is doing preprocessing using MQTT

3. solution_preprocessing_ai_grpc_dag: Is doing preprocessing AND AI using gRPC

4. solution_preprocessing_ml_ai_mqtt: Is dong preprocessing, ML AND AI using MQTT

5. solution_preprocessing_ml_mqtt_dag: Is doing preprocessing, ML using MQTT

6. solution_preprocessing_ml_dag: Is doing preprocessing and ML using LOCAL File

So on…

5.2. Here are the solution templates provided

Solution Templates

1. Solution Template: solution_template_processing_ai_dag_grpc.py

This template will analyse ANY real-time data using the gRPC protocol.

See gRPC Reference Architecture with integration with GenAI

for real-time AI analysis of TML output data.

2. Solution Template: solution_template_processing_ai_dag_mqtt.py

This template will analyse ANY real-time data using the MQTT protocol.

See MQTT Reference Architecture with integration with GenAI

for real-time AI analysis of TML output data.

3. Solution Template: solution_template_processing_ai_dag_restapi.py

This template will analyse ANY real-time data using the REST protocol.

See REST API Reference Architecture with integration with GenAI

for real-time AI analysis of TML output data.

4. Solution Template: solution_template_processing_ai_dag.py

This solution template will read a local file from the file system

and stream it to the TML solution for processing, with integration

with GenAI for further processing.

5. Solution Template: solution_template_processing_dag_grpc.py

This solution template will process data ingested using gRPC.

6. Solution Template: solution_template_processing_dag_mqtt.py

This solution template will process data using MQTT protocol.

7. Solution Template: solution_template_processing_dag_restapi.py

This solution template will process data using the REST API.

8. Solution Template: solution_template_processing_dag.py

This solution template will process data using local file.

9. Solution Template: solution_template_processing_ml_ai_dag_grpc.py

This solution template will process data, perform machine learning

and perform AI on the output data, while ingesting data from gRPC protocol.

10. Solution Template: solution_template_processing_ml_ai_dag_mqtt.py

This solution template will process data, perform machine learning

and perform AI on the output data, while ingesting data from MQTT protocol.

11. Solution Template: solution_template_processing_ml_ai_dag_restapi.py

This solution template will process data, perform machine learning

and perform AI on the output data, while ingesting data from REST API protocol.

12. Solution Template: solution_template_processing_ml_ai_dag.py

This solution template will process data, perform machine learning

and perform AI on the output data, while ingesting data from local file.

13. Solution Template: solution_template_processing_ml_dag_grpc.py

This solution template will process data, perform machine learning

and predictions while ingesting data from gRPC protocol.

14. Solution Template: solution_template_processing_ml_dag_mqtt.py

This solution template will process data, perform machine learning

and predictions while ingesting data from MQTT protocol.

15. Solution Template: solution_template_processing_ml_dag_restapi.py

This solution template will process data, perform machine learning

and predictions while ingesting data from REST API protocol.

16. Solution Template: solution_template_processing_ml_dag.py

This solution template will process data, perform machine learning

and predictions while ingesting data from local file.

5.3. 1. Solution Template: solution_template_processing_ai_dag_grpc.py

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_gRPC_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")


with DAG(
    dag_id="solution_preprocessing_ai_grpc_dag",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing_ai_grpc",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 7: Containerize the solution
  sensor_E = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_F = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
      op_args=['ai'],
  )
# STEP 9: PrivateGPT
  sensor_I = PythonOperator(
      task_id="step_9_solution_task_ai",
      python_callable=step9.startprivategpt,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_G = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B  >> start_task4 >> [sensor_I, sensor_C, sensor_D, sensor_E] >> start_task2 >> sensor_F >> start_task3  >> sensor_G

5.4. 2. Solution Template: solution_template_processing_ai_dag_mqtt.py

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_MQTT_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")


with DAG(
    dag_id="solution_preprocessing_ai_mqtt_dag",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing_ai_mqtt",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 7: Containerize the solution
  sensor_E = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_F = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
# STEP 9: PrivateGPT
  sensor_I = PythonOperator(
      task_id="step_9_solution_task_ai",
      python_callable=step9.startprivategpt,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_G = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B  >> start_task4 >> [sensor_I, sensor_C, sensor_D, sensor_E] >> start_task2 >> sensor_F >> start_task3  >> sensor_G

5.5. 3. Solution Template: solution_template_processing_ai_dag_restapi.py

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_RESTAPI_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")


with DAG(
    dag_id="solution_preprocessing_ai_restapi_dag",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing_ai_restapi",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 7: Containerize the solution
  sensor_E = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_F = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
# STEP 9: PrivateGPT
  sensor_I = PythonOperator(
      task_id="step_9_solution_task_ai",
      python_callable=step9.startprivategpt,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_G = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B  >> start_task4 >> [sensor_I, sensor_C, sensor_D, sensor_E] >> start_task2 >> sensor_F >> start_task3  >> sensor_G

5.6. 4. Solution Template: solution_template_processing_ai_dag.py

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime

import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_LOCALFILE_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")


with DAG(
    dag_id="solution_preprocessing_ai_dag",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing_ai",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 7: Containerize the solution
  sensor_E = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_F = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
# STEP 9: PrivateGPT
  sensor_I = PythonOperator(
      task_id="step_9_solution_task_ai",
      python_callable=step9.startprivategpt,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_G = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_I, sensor_C, sensor_D, sensor_E] >> start_task2 >> sensor_F >> start_task3  >> sensor_G

5.7. 5. Solution Template: solution_template_processing_dag_grpc.py

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime

import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_gRPC_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")

with DAG(
    dag_id="solution_preprocessing_dag_grpc",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing_grpc",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 7: Containerize the solution
  sensor_E = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_F = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_G = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E] >> start_task2 >> sensor_F >> start_task3  >> sensor_G

5.8. 6. Solution Template: solution_template_processing_dag_mqtt.py

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime

import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_MQTT_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")

with DAG(
    dag_id="solution_preprocessing_dag_mqtt",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing_mqtt",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 7: Containerize the solution
  sensor_E = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_F = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_G = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E] >> start_task2 >> sensor_F >> start_task3  >> sensor_G

5.9. 7. Solution Template: solution_template_processing_dag_restapi.py

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime

import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_RESTAPI_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")

with DAG(
    dag_id="solution_preprocessing_dag_restapi",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing_restapi",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 7: Containerize the solution
  sensor_E = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_F = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_G = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E] >> start_task2 >> sensor_F >> start_task3  >> sensor_G

5.10. 8. Solution Template: solution_template_processing_dag.py

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_LOCALFILE_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")


with DAG(
    dag_id="solution_preprocessing_dag",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 7: Containerize the solution
  sensor_E = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_F = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_G = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E] >> start_task2 >> sensor_F >> start_task3  >> sensor_G

5.11. 9. Solution Template: solution_template_processing_ml_ai_dag_grpc.py

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime

import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_gRPC_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")


with DAG(
    dag_id="solution_preprocessing_ml_ai_grpc_dag",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing_ml_ai",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 5: ML
  sensor_E = PythonOperator(
      task_id="step_5_solution_task_ml",
      python_callable=step5.startml,
      provide_context=True,
  )
# STEP 6: Predictions
  sensor_F = PythonOperator(
      task_id="step_6_solution_task_prediction",
      python_callable=step6.startpredictions,
      provide_context=True,
  )
  # STEP 7: Visualization the solution
  sensor_G = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_H = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
# STEP 9: PrivateGPT
  sensor_I = PythonOperator(
      task_id="step_9_solution_task_ai",
      python_callable=step9.startprivategpt,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_J = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_I, sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J

5.12. 10. Solution Template: solution_template_processing_ml_ai_dag_mqtt.py

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime

import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_MQTT_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")


with DAG(
    dag_id="solution_preprocessing_ml_ai_mqtt_dag",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing_ml_ai",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 5: ML
  sensor_E = PythonOperator(
      task_id="step_5_solution_task_ml",
      python_callable=step5.startml,
      provide_context=True,
  )
# STEP 6: Predictions
  sensor_F = PythonOperator(
      task_id="step_6_solution_task_prediction",
      python_callable=step6.startpredictions,
      provide_context=True,
  )
  # STEP 7: Visualization the solution
  sensor_G = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_H = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
# STEP 9: PrivateGPT
  sensor_I = PythonOperator(
      task_id="step_9_solution_task_ai",
      python_callable=step9.startprivategpt,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_J = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_I, sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J

5.13. 11. Solution Template: solution_template_processing_ml_ai_dag_restapi.py

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime

import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_RESTAPI_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")


with DAG(
    dag_id="solution_preprocessing_ml_ai_restapi_dag",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing_ml_ai",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 5: ML
  sensor_E = PythonOperator(
      task_id="step_5_solution_task_ml",
      python_callable=step5.startml,
      provide_context=True,
  )
# STEP 6: Predictions
  sensor_F = PythonOperator(
      task_id="step_6_solution_task_prediction",
      python_callable=step6.startpredictions,
      provide_context=True,
  )
  # STEP 7: Visualization the solution
  sensor_G = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_H = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
# STEP 9: PrivateGPT
  sensor_I = PythonOperator(
      task_id="step_9_solution_task_ai",
      python_callable=step9.startprivategpt,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_J = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_I, sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J

5.14. 12. Solution Template: solution_template_processing_ml_ai_dag.py

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime

import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_LOCALFILE_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")

with DAG(
    dag_id="solution_preprocessing_ml_ai_dag",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 5: ML
  sensor_E = PythonOperator(
      task_id="step_5_solution_task_ml",
      python_callable=step5.startml,
      provide_context=True,
  )
# STEP 6: Predictions
  sensor_F = PythonOperator(
      task_id="step_6_solution_task_prediction",
      python_callable=step6.startpredictions,
      provide_context=True,
  )

# STEP 7: Visualization the solution
  sensor_G = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_H = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
# STEP 9: PrivateGPT
  sensor_I = PythonOperator(
      task_id="step_9_solution_task_ai",
      python_callable=step9.startprivategpt,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_J = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_I, sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J

5.15. 13. Solution Template: solution_template_processing_ml_dag_grpc.py

  from __future__ import annotations

  import pendulum
  from airflow.decorators import task
  from airflow.models.dag import DAG
  from airflow.operators.bash import BashOperator
  from airflow.sensors.external_task import ExternalTaskSensor
  import tsslogging
  import os
  from datetime import datetime

  import importlib
  from airflow.operators.python import (
      ExternalPythonOperator,
      PythonOperator
  )
  step1 = importlib.import_module("tml_system_step_1_getparams_dag")
  step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
  step3 = importlib.import_module("tml_read_gRPC_step_3_kafka_producetotopic_dag")
  step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
  step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
  step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
  step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
  step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
  step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
  step10 = importlib.import_module("tml_system_step_10_documentation_dag")


  with DAG(
      dag_id="solution_preprocessing_ml_grpc_dag",
      start_date=datetime(2023, 1, 1),
      schedule=None,
  ) as dag:
    start_task = BashOperator(
      task_id="start_tasks_tml_preprocessing_ml_grpc",
      bash_command="echo 'Start task'",
    )
  # STEP 1: Get the Parameters
    sensor_A = PythonOperator(
              task_id="step_1_solution_task_getparams",
              python_callable=step1.getparams,
              provide_context=True,
    )

  # STEP 2: Create the Kafka topics
    sensor_B = PythonOperator(
        task_id="step_2_solution_task_createtopic",
        python_callable=step2.setupkafkatopics,
        provide_context=True,
    )
  # STEP 3: Produce data to topic
    sensor_C = PythonOperator(
        task_id="step_3_solution_task_producetotopic",
        python_callable=step3.startproducing,
        provide_context=True,
    )
  # STEP 4: Preprocess the data
    sensor_D = PythonOperator(
        task_id="step_4_solution_task_preprocess",
        python_callable=step4.dopreprocessing,
        provide_context=True,
    )
  # STEP 5: ML
    sensor_E = PythonOperator(
        task_id="step_5_solution_task_ml",
        python_callable=step5.startml,
        provide_context=True,
    )
  # STEP 6: Predictions
    sensor_F = PythonOperator(
        task_id="step_6_solution_task_prediction",
        python_callable=step6.startpredictions,
        provide_context=True,
    )

  # STEP 7: Visualization the solution
    sensor_G = PythonOperator(
        task_id="step_7_solution_task_visualization",
        python_callable=step7.startstreamingengine,
        provide_context=True,
)
# STEP 8: Containerize the solution
sensor_H = PythonOperator(

task_id=”step_8_solution_task_containerize”, python_callable=step8.dockerit, provide_context=True,

) start_task2 = BashOperator(

task_id=”Starting_Docker”, bash_command=”echo ‘Start task Completed’”,

) start_task3 = BashOperator(

task_id=”Starting_Documentation”, bash_command=”echo ‘Start task Completed’”,

) start_task4 = BashOperator(

task_id=”Completed_TML_Setup_Now_Spawn_Main_Processes”, bash_command=”echo ‘Start task Completed’”,

)

# STEP 10: Document the solution
sensor_J = PythonOperator(

task_id=”step_10_solution_task_document”, python_callable=step10.generatedoc, provide_context=True,

)

start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J

5.16. 14. Solution Template: solution_template_processing_ml_dag_mqtt.py

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime

import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_MQTT_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")


with DAG(
    dag_id="solution_preprocessing_ml_mqtt_dag",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing_ml_mqtt",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 5: ML
  sensor_E = PythonOperator(
      task_id="step_5_solution_task_ml",
      python_callable=step5.startml,
      provide_context=True,
  )
# STEP 6: Predictions
  sensor_F = PythonOperator(
      task_id="step_6_solution_task_prediction",
      python_callable=step6.startpredictions,
      provide_context=True,
  )

# STEP 7: Visualization the solution
  sensor_G = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_H = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_J = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J

5.17. 15. Solution Template: solution_template_processing_ml_dag_restapi.py

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime

import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_RESTAPI_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")


with DAG(
    dag_id="solution_preprocessing_ml_restapi_dag",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing_ml_restapi",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 5: ML
  sensor_E = PythonOperator(
      task_id="step_5_solution_task_ml",
      python_callable=step5.startml,
      provide_context=True,
  )
# STEP 6: Predictions
  sensor_F = PythonOperator(
      task_id="step_6_solution_task_prediction",
      python_callable=step6.startpredictions,
      provide_context=True,
  )

# STEP 7: Visualization the solution
  sensor_G = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_H = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_J = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J

5.18. 16. Solution Template: solution_template_processing_ml_dag.py

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml_system_step_1_getparams_dag")
step2 = importlib.import_module("tml_system_step_2_kafka_createtopic_dag")
step3 = importlib.import_module("tml_read_LOCALFILE_step_3_kafka_producetotopic_dag")
step4 = importlib.import_module("tml_system_step_4_kafka_preprocess_dag")
step5 = importlib.import_module("tml_system_step_5_kafka_machine_learning_dag")
step6 = importlib.import_module("tml_system_step_6_kafka_predictions_dag")
step7 = importlib.import_module("tml_system_step_7_kafka_visualization_dag")
step8 = importlib.import_module("tml_system_step_8_deploy_solution_to_docker_dag")
step9 = importlib.import_module("tml_system_step_9_privategpt_qdrant_dag")
step10 = importlib.import_module("tml_system_step_10_documentation_dag")

with DAG(
    dag_id="solution_preprocessing_ml_dag",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing_ml",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 5: ML
  sensor_E = PythonOperator(
      task_id="step_5_solution_task_ml",
      python_callable=step5.startml,
      provide_context=True,
  )
# STEP 6: Predictions
  sensor_F = PythonOperator(
      task_id="step_6_solution_task_prediction",
      python_callable=step6.startpredictions,
      provide_context=True,
  )

# STEP 7: Visualization the solution
  sensor_G = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_H = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_J = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J

5.19. How To Read a Solution Template

5.19.1. 1. import files

  • Consider the following import files:

    • step1 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_1_getparams_dag-myawesometmlsolutionml-3f10”)

    • step2 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_2_kafka_createtopic_dag-myawesometmlsolutionml-3f10”)

    • step3 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_read_LOCALFILE_step_3_kafka_producetotopic_dag-myawesometmlsolutionml-3f10”)

    • step4 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_4_kafka_preprocess_dag-myawesometmlsolutionml-3f10”)

    • step5 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_5_kafka_machine_learning_dag-myawesometmlsolutionml-3f10”)

    • step6 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_6_kafka_predictions_dag-myawesometmlsolutionml-3f10”)

    • step7 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_7_kafka_visualization_dag-myawesometmlsolutionml-3f10”)

    • step8 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_8_deploy_solution_to_docker_dag-myawesometmlsolutionml-3f10”)

    • step9 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_9_privategpt_qdrant_dag-myawesometmlsolutionml-3f10”)

    • step10 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_10_documentation_dag-myawesometmlsolutionml-3f10”)

Each of these steps holds the code definition of the DAG. For example:

step1 = importlib.import_module(“tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_1_getparams_dag-myawesometmlsolutionml-3f10”)

tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_1_getparams_dag-myawesometmlsolutionml-3f10 is a physical python file located in the folder:

<your repo>/tml-airflow/dags/tml-solutions/myawesometmlsolutionml-3f10/tml_system_step_1_getparams_dag-myawesometmlsolutionml-3f10.py

For this example it is here: https://github.com/smaurice101/raspberrypi/blob/main/tml-airflow/dags/tml-solutions/myawesometmlsolutionml-3f10/tml_system_step_1_getparams_dag-myawesometmlsolutionml-3f10.py

5.19.2. 2. tasks to execute

Keeping with the above example. We can create the task for Step 1:

  • STEP 1: Get the Parameters
    sensor_A = PythonOperator(

    task_id=”step_1_solution_task_getparams”, python_callable=step1.getparams, provide_context=True,

    )

python_callable=step1.getparams,

5.19.3. 3. tasks groupings

We can now group the tasks into a complete end-end TML solution:

start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J

Note

These tasks will run sequentially. The tasks in the square brackets, [sensor_C, sensor_D, sensor_E, sensor_F, sensor_G], will run in parallel. The entire process is shown in the graph below.

_images/mlgraph.png

5.20. Running A Solution Container

Follow the instructions here Lets Start Building a TML Solution