9. TML Solution Examples

All examples simply follow the steps here Lets Start Building a TML Solution

Tip

You can watch the development and running of the examples below in this Youtube video.

TSS Starter Examples

  1. Real-Time IoT Data Preprocessing Example

This will process IoT data for anomalies, stream the results to a dashboard,

build and push the container to docker hub, automatically create the

solution documentation on Readthdocs, and auto-commit your solution

to GitHub.

Data Used is here (unzip the file): Demo Data

  1. Real-Time IoT Data Preprocessing With Secondary Preprocessing and With Map Example

This will process IoT data for anomalies, stream the results to a dashboard,

but will also perform a secondary processing using

STEP 4b: Preprocesing 2 Data: tml-system-step-4b-kafka-preprocess-dag

to build and push the container to docker hub, automatically create the

solution documentation on Readthdocs, and auto-commit your solution

to GitHub.

2b. Real-Time IoT Data Preprocessing With RESTAPI, Secondary Preprocessing and With Map Example

This solution is the same as 2, BUT this will solution will ingest real-time data from a RESTAPI client.

To run this solution you must:

  1. Have this solution running in Kubernetes

  2. Download the IoTData.zip demo data

  3. Download and Run the client RESTAPI file

  4. Store these files to your local computer

2c. Real-Time IoT Data Preprocessing With gRPC, Secondary Preprocessing and With Map Example

This solution is the same as 2, BUT this will solution will ingest real-time data from a gRPC client.

To run this solution you must:

  1. Have this solution running in Kubernetes

  2. Download the IoTData.zip demo data

  3. Download and Run the client gRPC file

  4. Store these files to your local computer

  1. Real-Time IoT Data Preprocessing and Machine Learning Example

This will process IoT data, and perform entity level machine learning (TML)

on the data, stream the results to a dashboard,

build and push the container to docker hub, automatically create the

solution documentation on Readthdocs, and auto-commit your solution

to GitHub.

Data Used is here (unzip the file): Demo Data

  1. Cybersecurity Solution with PrivateGPT, MQTT, HiveMQ

This will process cybersecurity data for anomalies,

and further process the TML anomalies

data by sending the results to privateGPT container. It will stream

the results to a dashboard,

build and push the container to docker hub, automatically create the

solution documentation on Readthdocs, and auto-commit your solution

to GitHub.

MQTT Client File here is: MQTT Client

Data Used is here (unzip the file): Demo Data

4b. To Scale this Solution Watch the YouTube Video: here

Note

While we are using a local file for the demos, this is to make it easy for users to build and run these solutions themselves. Ideally, in industry, we would use APIs like MQTT, REST and gRPC to ingest data from devices and stream it to TML solutions. TSS allows you to build solutions with these APIs with pre-written DAGs here:

9.1. Real-Time IoT Data Preprocessing Example

Solution DAG Code: solution_preprocessing_dag-myawesometmlsolution-3f10

This IoT Data Preprocessing Solution DAG: solution_preprocessing_dag-myawesometmlsolution-3f10 reads local file data in /rawdata/IoTdata.txt and streams it to Kafka. The streaming data are then processed with TML binary Viper and the output data are streamed to a browser that runs the dashboard: dashboard.html that is located in /Viperviz/viperviz/views.

The solution will automatically build and push the solution container to docker hub, automatically create documentation on READTHEDOCS.io and automatically commits your solution code to Github, all in about 2 minutes.

_images/soldags1.png

9.1.1. Solution DAG Code: solution_preprocessing_dag-myawesometmlsolution-3f10

The Python code below is the code representtion for the figure. This code builds the entire end-end TML solution in about 2 minutes.

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml-solutions.myawesometmlsolution-3f10.tml_system_step_1_getparams_dag-myawesometmlsolution-3f10")
step2 = importlib.import_module("tml-solutions.myawesometmlsolution-3f10.tml_system_step_2_kafka_createtopic_dag-myawesometmlsolution-3f10")
step3 = importlib.import_module("tml-solutions.myawesometmlsolution-3f10.tml_read_LOCALFILE_step_3_kafka_producetotopic_dag-myawesometmlsolution-3f10")
step4 = importlib.import_module("tml-solutions.myawesometmlsolution-3f10.tml_system_step_4_kafka_preprocess_dag-myawesometmlsolution-3f10")
step5 = importlib.import_module("tml-solutions.myawesometmlsolution-3f10.tml_system_step_5_kafka_machine_learning_dag-myawesometmlsolution-3f10")
step6 = importlib.import_module("tml-solutions.myawesometmlsolution-3f10.tml_system_step_6_kafka_predictions_dag-myawesometmlsolution-3f10")
step7 = importlib.import_module("tml-solutions.myawesometmlsolution-3f10.tml_system_step_7_kafka_visualization_dag-myawesometmlsolution-3f10")
step8 = importlib.import_module("tml-solutions.myawesometmlsolution-3f10.tml_system_step_8_deploy_solution_to_docker_dag-myawesometmlsolution-3f10")
step9 = importlib.import_module("tml-solutions.myawesometmlsolution-3f10.tml_system_step_9_privategpt_qdrant_dag-myawesometmlsolution-3f10")
step10 = importlib.import_module("tml-solutions.myawesometmlsolution-3f10.tml_system_step_10_documentation_dag-myawesometmlsolution-3f10")


with DAG(
    dag_id="solution_preprocessing_dag-myawesometmlsolution-3f10",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 7: Containerize the solution
  sensor_E = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_F = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_G = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E] >> start_task2 >> sensor_F >> start_task3  >> sensor_G

9.1.1.1. TML Dag Parameter Changes To Be Made For: solution_preprocessing_dag-myawesometmlsolution-3f10

Note

No TML Dag parameter changes are needed for this solution.

9.1.1.2. Successful Run Screen

Below the TSS/Airflow screen that shows a successful TML solution build. All colors should be green for all of the steps. If you see a red color, it means your DAG has an error.

_images/p53.png

9.2. Solution Documentation Example

This is the solution documentation that is auto-generated by TSS. Every TML solution you create will have its own auto-generated documentation that will provide details on the entire solution.

_images/sp1.png

Important

You will need to run the solution in your own TSS environment for the links to work in this documentation. It is provided as an example of the powerful capabilities of TSS: https://myawesometmlsolution-3f10.readthedocs.io/

Here is the Solution Real-Time Dashboard:

_images/sp4.png

Here is the Solution Docker Run container:

_images/sp6.png

The entire end-end real-time solution took less than 2 minutes to build:

_images/sp7.png

9.3. Github Commits

_images/sp9.png

9.4. Real-Time IoT Data Preprocessing With Secondary Preprocessing and With Map Example

Solution DAG Code: solution_preprocessing_dag-iotsolution-3f10

This IoT Data Preprocessing Solution DAG: solution_preprocessing_dag-myawesometmlsolution-3f10 reads local file data in /rawdata/IoTdata.txt and streams it to Kafka. The streaming data are then processed TWICE with TML binary Viper and the output data are streamed to a browser that runs the dashboard with a map: iot-failure-seneca.html that is located in /Viperviz/viperviz/views.

The solution will automatically build and push the solution container to docker hub, automatically create documentation on READTHEDOCS.io and automatically commits your solution code to Github, all in about 2 minutes.

9.4.1. Solution DAG Code: solution_preprocessing_dag-iotsolution-3f10

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml-solutions.iotsolution-3f10.tml_system_step_1_getparams_dag-iotsolution-3f10")
step2 = importlib.import_module("tml-solutions.iotsolution-3f10.tml_system_step_2_kafka_createtopic_dag-iotsolution-3f10")
step3 = importlib.import_module("tml-solutions.iotsolution-3f10.tml_read_LOCALFILE_step_3_kafka_producetotopic_dag-iotsolution-3f10")
step4 = importlib.import_module("tml-solutions.iotsolution-3f10.tml_system_step_4_kafka_preprocess_dag-iotsolution-3f10")
step4b = importlib.import_module("tml-solutions.iotsolution-3f10.tml_system_step_4b_kafka_preprocess_dag-iotsolution-3f10")

step5 = importlib.import_module("tml-solutions.iotsolution-3f10.tml_system_step_5_kafka_machine_learning_dag-iotsolution-3f10")
step6 = importlib.import_module("tml-solutions.iotsolution-3f10.tml_system_step_6_kafka_predictions_dag-iotsolution-3f10")
step7 = importlib.import_module("tml-solutions.iotsolution-3f10.tml_system_step_7_kafka_visualization_dag-iotsolution-3f10")
step8 = importlib.import_module("tml-solutions.iotsolution-3f10.tml_system_step_8_deploy_solution_to_docker_dag-iotsolution-3f10")
step9 = importlib.import_module("tml-solutions.iotsolution-3f10.tml_system_step_9_privategpt_qdrant_dag-iotsolution-3f10")
step10 = importlib.import_module("tml-solutions.iotsolution-3f10.tml_system_step_10_documentation_dag-iotsolution-3f10")


with DAG(
    dag_id="solution_preprocessing_dag-iotsolution-3f10",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
    # STEP 4b: Preprocess the data
  sensor_D2 = PythonOperator(
      task_id="step_4b_solution_task_preprocess",
      python_callable=step4b.dopreprocessing,
      provide_context=True,
  )
# STEP 7: Containerize the solution
  sensor_E = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_F = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_G = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_D2, sensor_E] >> start_task2 >> sensor_F >> start_task3  >> sensor_G

9.4.1.1. TML Dag Parameter Changes To Be Made For: solution_preprocessing_dag-iotsolution-3f10

Note

No TML Dag parameter changes are needed for this solution.

9.4.1.2. Successful Run Screen

Below the TSS/Airflow screen that shows a successful TML solution build. All colors should be green for all of the steps. If you see a red color, it means your DAG has an error.

_images/iotsol1.png

9.5. Solution Documentation Example

This is the solution documentation that is auto-generated by TSS. Every TML solution you create will have its own auto-generated documentation that will provide details on the entire solution.

_images/iotsol2.png

Important

You will need to run the solution in your own TSS environment for the links to work in this documentation. It is provided as an example of the powerful capabilities of TSS: https://iotsolution-3f10.readthedocs.io/

Here is the Solution Real-Time Dashboard:

_images/iotsol4.png

Here is the Solution Docker Run container:

_images/iotsol3.png

9.6. Real-Time IoT Data Preprocessing With RESTAPI, Secondary Preprocessing and With Map Example

Solution DAG Code: solution_preprocessing_dag_restapi-iotsolution-restapi-3f10

This IoT Data Preprocessing Solution DAG: solution_preprocessing_dag_restapi-iotsolution-restapi-3f10 ingest data by REST API. Users can stream data using the client REST file into Kubernetes.

The streaming data are then processed TWICE with TML binary Viper and the output data are streamed to a browser that runs the dashboard with a map: iot-failure-seneca.html that is located in /Viperviz/viperviz/views.

The solution will automatically build and push the solution container to docker hub, automatically create documentation on READTHEDOCS.io and automatically commits your solution code to Github, all in about 2 minutes.

9.6.1. Solution DAG Code: solution_preprocessing_dag_restapi-iotsolution-restapi-3f10

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime

import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml-solutions.iotsolution-restapi-3f10.tml_system_step_1_getparams_dag-iotsolution-restapi-3f10")
step2 = importlib.import_module("tml-solutions.iotsolution-restapi-3f10.tml_system_step_2_kafka_createtopic_dag-iotsolution-restapi-3f10")
step3 = importlib.import_module("tml-solutions.iotsolution-restapi-3f10.tml_read_RESTAPI_step_3_kafka_producetotopic_dag-iotsolution-restapi-3f10")
step4 = importlib.import_module("tml-solutions.iotsolution-restapi-3f10.tml_system_step_4_kafka_preprocess_dag-iotsolution-restapi-3f10")
step4b = importlib.import_module("tml-solutions.iotsolution-restapi-3f10.tml_system_step_4b_kafka_preprocess_dag-iotsolution-restapi-3f10")

step5 = importlib.import_module("tml-solutions.iotsolution-restapi-3f10.tml_system_step_5_kafka_machine_learning_dag-iotsolution-restapi-3f10")
step6 = importlib.import_module("tml-solutions.iotsolution-restapi-3f10.tml_system_step_6_kafka_predictions_dag-iotsolution-restapi-3f10")
step7 = importlib.import_module("tml-solutions.iotsolution-restapi-3f10.tml_system_step_7_kafka_visualization_dag-iotsolution-restapi-3f10")
step8 = importlib.import_module("tml-solutions.iotsolution-restapi-3f10.tml_system_step_8_deploy_solution_to_docker_dag-iotsolution-restapi-3f10")
step9 = importlib.import_module("tml-solutions.iotsolution-restapi-3f10.tml_system_step_9_privategpt_qdrant_dag-iotsolution-restapi-3f10")
step10 = importlib.import_module("tml-solutions.iotsolution-restapi-3f10.tml_system_step_10_documentation_dag-iotsolution-restapi-3f10")

with DAG(
    dag_id="solution_preprocessing_dag_restapi-iotsolution-restapi-3f10",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing_restapi",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 4b: Preprocess the data
  sensor_D2 = PythonOperator(
      task_id="step_4b_solution_task_preprocess",
      python_callable=step4b.dopreprocessing,
      provide_context=True,
  )

# STEP 7: Containerize the solution
  sensor_E = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_F = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_G = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_D2, sensor_E] >> start_task2 >> sensor_F >> start_task3  >> sensor_G

9.6.1.1. TML Dag Parameter Changes To Be Made For: solution_preprocessing_dag_restapi-iotsolution-restapi-3f10

Note

No TML Dag parameter changes are needed for this solution.

9.6.1.2. Successful Run Screen

Below the TSS/Airflow screen that shows a successful TML solution build. All colors should be green for all of the steps. If you see a red color, it means your DAG has an error.

_images/iotrest1.png

9.6.1.3. Solution Documentation Example

This is the solution documentation that is auto-generated by TSS. Every TML solution you create will have its own auto-generated documentation that will provide details on the entire solution.

_images/iotrest3.png

Important

You will need to run the solution in your own TSS environment for the links to work in this documentation. It is provided as an example of the powerful capabilities of TSS: `https://iotsolution-restapi-3f10.readthedocs.io/en/latest/index.html>`_

Here is the Solution Real-Time Dashboard:

_images/iotsol4.png

Here is the Solution Docker Run container:

_images/iotrest2.png

9.7. Real-Time IoT Data Preprocessing With gRPC, Secondary Preprocessing and With Map Example

Solution DAG Code: solution_preprocessing_dag_grpc-iotsolution-grpc-3f10

This IoT Data Preprocessing Solution DAG: solution_preprocessing_dag_grpc-iotsolution-grpc-3f10 ingest data by gRPC API. Users can stream data using the client gRPC file into Kubernetes.

The streaming data are then processed TWICE with TML binary Viper and the output data are streamed to a browser that runs the dashboard with a map: iot-failure-seneca.html that is located in /Viperviz/viperviz/views.

The solution will automatically build and push the solution container to docker hub, automatically create documentation on READTHEDOCS.io and automatically commits your solution code to Github, all in about 2 minutes.

9.7.1. Solution DAG Code: solution_preprocessing_dag_grpc-iotsolution-grpc-3f10

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime

import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml-solutions.iotsolution-grpc-3f10.tml_system_step_1_getparams_dag-iotsolution-grpc-3f10")
step2 = importlib.import_module("tml-solutions.iotsolution-grpc-3f10.tml_system_step_2_kafka_createtopic_dag-iotsolution-grpc-3f10")
step3 = importlib.import_module("tml-solutions.iotsolution-grpc-3f10.tml_read_gRPC_step_3_kafka_producetotopic_dag-iotsolution-grpc-3f10")
step4 = importlib.import_module("tml-solutions.iotsolution-grpc-3f10.tml_system_step_4_kafka_preprocess_dag-iotsolution-grpc-3f10")
step4b = importlib.import_module("tml-solutions.iotsolution-grpc-3f10.tml_system_step_4b_kafka_preprocess_dag-iotsolution-grpc-3f10")

step5 = importlib.import_module("tml-solutions.iotsolution-grpc-3f10.tml_system_step_5_kafka_machine_learning_dag-iotsolution-grpc-3f10")
step6 = importlib.import_module("tml-solutions.iotsolution-grpc-3f10.tml_system_step_6_kafka_predictions_dag-iotsolution-grpc-3f10")
step7 = importlib.import_module("tml-solutions.iotsolution-grpc-3f10.tml_system_step_7_kafka_visualization_dag-iotsolution-grpc-3f10")
step8 = importlib.import_module("tml-solutions.iotsolution-grpc-3f10.tml_system_step_8_deploy_solution_to_docker_dag-iotsolution-grpc-3f10")
step9 = importlib.import_module("tml-solutions.iotsolution-grpc-3f10.tml_system_step_9_privategpt_qdrant_dag-iotsolution-grpc-3f10")
step10 = importlib.import_module("tml-solutions.iotsolution-grpc-3f10.tml_system_step_10_documentation_dag-iotsolution-grpc-3f10")

with DAG(
    dag_id="solution_preprocessing_dag_grpc-iotsolution-grpc-3f10",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing_grpc",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 4b: Preprocess the data
  sensor_D2 = PythonOperator(
      task_id="step_4b_solution_task_preprocess",
      python_callable=step4b.dopreprocessing,
      provide_context=True,
  )

# STEP 7: Containerize the solution
  sensor_E = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_F = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_G = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_D2, sensor_E] >> start_task2 >> sensor_F >> start_task3  >> sensor_G

9.7.1.1. TML Dag Parameter Changes To Be Made For: solution_preprocessing_dag_grpc-iotsolution-grpc-3f10

Note

No TML Dag parameter changes are needed for this solution.

9.7.1.2. Successful Run Screen

Below the TSS/Airflow screen that shows a successful TML solution build. All colors should be green for all of the steps. If you see a red color, it means your DAG has an error.

_images/iotgrpc1.png

9.7.1.3. Solution Documentation Example

This is the solution documentation that is auto-generated by TSS. Every TML solution you create will have its own auto-generated documentation that will provide details on the entire solution.

_images/iotgrpc3.png

Important

You will need to run the solution in your own TSS environment for the links to work in this documentation. It is provided as an example of the powerful capabilities of TSS: `https://iotsolution-grpc-3f10.readthedocs.io/en/latest/index.html>`_

Here is the Solution Real-Time Dashboard:

_images/iotsol4.png

Here is the Solution Docker Run container:

_images/iotgrpc2.png

9.8. Real-Time IoT Data Preprocessing and Machine Learning Example

Solution DAG Code: solution_preprocessing_ml_dag-myawesometmlsolutionml-3f10

This IoT Data Preprocessing and Machine Learning Solution DAG: solution_preprocessing_ml_dag-myawesometmlsolutionml-3f10 reads local file data in /rawdata/IoTdata.txt and streams it to Kafka. The streaming data are then processed and entity level machine learning is performed with TML binaries Viper and HPDE, the output data are streamed to a browser that runs the dashboard: iot-failure-machinelearning.html, that is located in /Viperviz/viperviz/views.

The solution will automatically build and push the solution container to docker hub, automatically create documentation on READTHEDOCS.io and automatically commit your solution code to Github, all in about 2 minutes.

9.8.1. Solution DAG Code: solution_preprocessing_ml_dag-myawesometmlsolutionml-3f10

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_1_getparams_dag-myawesometmlsolutionml-3f10")
step2 = importlib.import_module("tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_2_kafka_createtopic_dag-myawesometmlsolutionml-3f10")
step3 = importlib.import_module("tml-solutions.myawesometmlsolutionml-3f10.tml_read_LOCALFILE_step_3_kafka_producetotopic_dag-myawesometmlsolutionml-3f10")
step4 = importlib.import_module("tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_4_kafka_preprocess_dag-myawesometmlsolutionml-3f10")
step5 = importlib.import_module("tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_5_kafka_machine_learning_dag-myawesometmlsolutionml-3f10")
step6 = importlib.import_module("tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_6_kafka_predictions_dag-myawesometmlsolutionml-3f10")
step7 = importlib.import_module("tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_7_kafka_visualization_dag-myawesometmlsolutionml-3f10")
step8 = importlib.import_module("tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_8_deploy_solution_to_docker_dag-myawesometmlsolutionml-3f10")
step9 = importlib.import_module("tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_9_privategpt_qdrant_dag-myawesometmlsolutionml-3f10")
step10 = importlib.import_module("tml-solutions.myawesometmlsolutionml-3f10.tml_system_step_10_documentation_dag-myawesometmlsolutionml-3f10")

with DAG(
    dag_id="solution_preprocessing_ml_dag-myawesometmlsolutionml-3f10",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing_ml",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 5: ML
  sensor_E = PythonOperator(
      task_id="step_5_solution_task_ml",
      python_callable=step5.startml,
      provide_context=True,
  )
# STEP 6: Predictions
  sensor_F = PythonOperator(
      task_id="step_6_solution_task_prediction",
      python_callable=step6.startpredictions,
      provide_context=True,
  )

# STEP 7: Visualization the solution
  sensor_G = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_H = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_J = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B >> start_task4 >> [sensor_C, sensor_D, sensor_E, sensor_F, sensor_G] >> start_task2 >> sensor_H >> start_task3 >> sensor_J

9.8.1.1. TML Dag Parameter Changes To Be Made For: solution_preprocessing_ml_dag-myawesometmlsolutionml-3f10

Tip

This is the same that is located here: solution_preprocessing_ml_dag-myawesometmlsolutionml-3f10

TML Dag Default_args Parameter To Change To New Value

TML Dag: tml_system_step_2_kafka_createtopic_dag-myawesometmlsolutionml-3f10.py

Current Value: ‘numpartitions’: ‘1’

New Value: ‘numpartitions’: ‘3’

TML Dag: tml_system_step_5_kafka_machine_learning_dag-myawesometmlsolutionml-3f10.py

Current Value: ‘islogistic’ : ‘0’

New Value: ‘islogistic’ : ‘1’

TML Dag: tml_system_step_5_kafka_machine_learning_dag-myawesometmlsolutionml-3f10.py

Current Value: ‘dependentvariable’ : ‘’

New Value: ‘dependentvariable’ : ‘failure’

TML Dag: tml_system_step_5_kafka_machine_learning_dag-myawesometmlsolutionml-3f10.py

Current Value: ‘independentvariables’: ‘’

New Value: ‘independentvariables’: ‘Power_preprocessed_Trend,Voltage_preprocessed_Trend,Current_preprocessed_Trend’

TML Dag: tml_system_step_5_kafka_machine_learning_dag-myawesometmlsolutionml-3f10.py

Current Value: ‘fullpathtotrainingdata’ : ‘/Viper-ml/viperlogs/<choose foldername>’,

New Value: ‘fullpathtotrainingdata’ : ‘/Viper-ml/viperlogs/iotlogistic’

TML Dag: tml_system_step_5_kafka_machine_learning_dag-myawesometmlsolutionml-3f10.py

Current Value: ‘processlogic’ : ‘’

New Value: processlogic’: ‘classification_name=failure_prob:Power_preprocessed_Trend=-n,0:Voltage_preprocessed_Trend=-n,0:Current_preprocessed_Trend=-n,0’

TML Dag: tml_system_step_6_kafka_predictions_dag-myawesometmlsolutionml-3f10.py

Current Value: ‘consumefrom’ : ‘’

New Value: ‘consumefrom’ : ‘ml-data’

TML Dag: tml_system_step_6_kafka_predictions_dag-myawesometmlsolutionml-3f10.py

Current Value: ‘pathtoalgos’ : ‘/Viper-ml/viperlogs/<choose foldername>’

New Value: ‘pathtoalgos’ : ‘/Viper-ml/viperlogs/iotlogistic’

TML Dag: tml_system_step_7_kafka_visualization_dag-myawesometmlsolutionml-3f10.py

Current Value: ‘topic’ : ‘iot-preprocess,iot-preprocess2’

New Value: ‘topic’ : ‘iot-preprocess,iot-ml-prediction-results-output’

TML Dag: tml_system_step_7_kafka_visualization_dag-myawesometmlsolutionml-3f10.py

Current Value: ‘dashboardhtml’: ‘dashboard.html’

New Value: ‘dashboardhtml’: ‘iot-failure-machinelearning.html’

Here is the TSS successful run:

_images/ml3.png

Here is the automated readthedocs documentation

_images/ml2.png

This is the real-time dashboard generated:

_images/mldash.png

Here is the docker container that was automatically built and pushed to Docker hub:

_images/ml4.png

9.9. Cybersecurity Solution with PrivateGPT, MQTT, HiveMQ

Solution DAG Code: solution_preprocessing_ai_mqtt_dag-cybersecuritywithprivategpt-3f10

This Cybersecurity Data Preprocessing with GenAI Solution DAG: solution_preprocessing_ai_dag-cybersecuritysolutionwithprivategpt-3f10 reads local file data in /rawdata/cisco_network_data.txt and streams it to Kafka. The streaming data are then processed, the processed output data sent to the privateGPT container and Qdrant vector DB for further analysis. Processing is done by Viper and AI is performed by privateGPT, the output data are streamed to a browser that runs the dashboard: tml-cisco-network-privategpt-monitor.html, that is located in /Viperviz/viperviz/views.

The solution will automatically build and push the solution container to docker hub, automatically create documentation on READTHEDOCS.io and automatically commit your solution code to Github, all in about 2 minutes.

Note also the solution will start the privateGPT and Qdrant containers automatically for you.

9.9.1. Solution DAG Code: solution_preprocessing_ai_mqtt_dag-cybersecuritywithprivategpt-3f10

from __future__ import annotations

import pendulum
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
import tsslogging
import os
from datetime import datetime
import importlib
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator
)
step1 = importlib.import_module("tml-solutions.cybersecuritywithprivategpt-3f10.tml_system_step_1_getparams_dag-cybersecuritywithprivategpt-3f10")
step2 = importlib.import_module("tml-solutions.cybersecuritywithprivategpt-3f10.tml_system_step_2_kafka_createtopic_dag-cybersecuritywithprivategpt-3f10")
step3 = importlib.import_module("tml-solutions.cybersecuritywithprivategpt-3f10.tml_read_MQTT_step_3_kafka_producetotopic_dag-cybersecuritywithprivategpt-3f10")
step4 = importlib.import_module("tml-solutions.cybersecuritywithprivategpt-3f10.tml_system_step_4_kafka_preprocess_dag-cybersecuritywithprivategpt-3f10")
step5 = importlib.import_module("tml-solutions.cybersecuritywithprivategpt-3f10.tml_system_step_5_kafka_machine_learning_dag-cybersecuritywithprivategpt-3f10")
step6 = importlib.import_module("tml-solutions.cybersecuritywithprivategpt-3f10.tml_system_step_6_kafka_predictions_dag-cybersecuritywithprivategpt-3f10")
step7 = importlib.import_module("tml-solutions.cybersecuritywithprivategpt-3f10.tml_system_step_7_kafka_visualization_dag-cybersecuritywithprivategpt-3f10")
step8 = importlib.import_module("tml-solutions.cybersecuritywithprivategpt-3f10.tml_system_step_8_deploy_solution_to_docker_dag-cybersecuritywithprivategpt-3f10")
step9 = importlib.import_module("tml-solutions.cybersecuritywithprivategpt-3f10.tml_system_step_9_privategpt_qdrant_dag-cybersecuritywithprivategpt-3f10")
step10 = importlib.import_module("tml-solutions.cybersecuritywithprivategpt-3f10.tml_system_step_10_documentation_dag-cybersecuritywithprivategpt-3f10")


with DAG(
    dag_id="solution_preprocessing_ai_mqtt_dag-cybersecuritywithprivategpt-3f10",
    start_date=datetime(2023, 1, 1),
    schedule=None,
) as dag:
  start_task = BashOperator(
    task_id="start_tasks_tml_preprocessing_ai_mqtt",
    bash_command="echo 'Start task'",
  )
# STEP 1: Get the Parameters
  sensor_A = PythonOperator(
            task_id="step_1_solution_task_getparams",
            python_callable=step1.getparams,
            provide_context=True,
  )

# STEP 2: Create the Kafka topics
  sensor_B = PythonOperator(
      task_id="step_2_solution_task_createtopic",
      python_callable=step2.setupkafkatopics,
      provide_context=True,
  )
# STEP 3: Produce data to topic
  sensor_C = PythonOperator(
      task_id="step_3_solution_task_producetotopic",
      python_callable=step3.startproducing,
      provide_context=True,
  )
# STEP 4: Preprocess the data
  sensor_D = PythonOperator(
      task_id="step_4_solution_task_preprocess",
      python_callable=step4.dopreprocessing,
      provide_context=True,
  )
# STEP 7: Containerize the solution
  sensor_E = PythonOperator(
      task_id="step_7_solution_task_visualization",
      python_callable=step7.startstreamingengine,
      provide_context=True,
  )
# STEP 8: Containerize the solution
  sensor_F = PythonOperator(
      task_id="step_8_solution_task_containerize",
      python_callable=step8.dockerit,
      provide_context=True,
  )
# STEP 9: PrivateGPT
  sensor_I = PythonOperator(
      task_id="step_9_solution_task_ai",
      python_callable=step9.startprivategpt,
      provide_context=True,
  )
  start_task2 = BashOperator(
    task_id="Starting_Docker",
    bash_command="echo 'Start task Completed'",
  )
  start_task3 = BashOperator(
    task_id="Starting_Documentation",
    bash_command="echo 'Start task Completed'",
  )
  start_task4 = BashOperator(
    task_id="Completed_TML_Setup_Now_Spawn_Main_Processes",
    bash_command="echo 'Start task Completed'",
  )
# STEP 10: Document the solution
  sensor_G = PythonOperator(
      task_id="step_10_solution_task_document",
      python_callable=step10.generatedoc,
      provide_context=True,
  )

  start_task >> sensor_A >> sensor_B  >> start_task4 >> [sensor_I, sensor_C, sensor_D, sensor_E] >> start_task2 >> sensor_F >> start_task3  >> sensor_G

9.9.1.1. TML Dag Parameter Changes To Be Made For: solution_preprocessing_ai_mqtt_dag-cybersecuritywithprivategpt-3f10

Tip

This is the same that is located here: solution_preprocessing_ai_mqtt_dag-cybersecuritywithprivategpt-3f10

TML Dag Default_args Parameter To Change To New Value

TML Dag: tml_system_step_2_kafka_createtopic_dag-cybersecuritywithprivategpt-3f10.py

Current Value: ‘raw_data_topic’ : ‘iot-raw-data’

New Value: ‘raw_data_topic’ : ‘cisco-network-mainstream’

TML Dag: tml_system_step_2_kafka_createtopic_dag-cybersecuritywithprivategpt-3f10.py

Current Value: ‘preprocess_data_topic’ : ‘iot-preprocess,iot-preprocess2’

New Value: ‘preprocess_data_topic’ : ‘cisco-network-preprocess’

TML Dag: tml_read_MQTT_step_3_kafka_producetotopic_dag-cybersecuritywithprivategpt-3f10.py

Current Value: ‘topics’ : ‘iot-raw-data’

New Value: ‘topics’ : ‘cisco-network-mainstream’

TML Dag: tml_read_MQTT_step_3_kafka_producetotopic_dag-cybersecuritywithprivategpt-3f10.py

Current Value: ‘mqtt_broker’ : ‘’

New Value: ‘mqtt_broker’ : ‘<ENTER YOUR HIVEMQ BROKER>’

For example - HIVEMQ broker should look similar to this:

b526253c5560459da5337e561c142369.s1.eu.hivemq.cloud

TML Dag: tml_read_MQTT_step_3_kafka_producetotopic_dag-cybersecuritywithprivategpt-3f10.py

Current Value: ‘mqtt_port’ : ‘’

New Value: ‘mqtt_port’ : ‘8883’,

TML Dag: tml_read_MQTT_step_3_kafka_producetotopic_dag-cybersecuritywithprivategpt-3f10.py

Current Value: ‘mqtt_subscribe_topic’ : ‘’

New Value: ‘mqtt_subscribe_topic’ : ‘tml/cybersecurity’

TML Dag: tml_read_MQTT_step_3_kafka_producetotopic_dag-cybersecuritywithprivategpt-3f10.py

Current Value: ‘mqtt_enabletls’: ‘0’

New Value: ‘mqtt_enabletls’: ‘1’

TML Dag: tml_system_step_4_kafka_preprocess_dag-cybersecuritywithprivategpt-3f10.py

Curent Value: ‘raw_data_topic’ : ‘iot-raw-data’

New Value: ‘raw_data_topic’ : ‘cisco-network-mainstream’

TML Dag: tml_system_step_4_kafka_preprocess_dag-cybersecuritywithprivategpt-3f10.py

Current Value: ‘preprocess_data_topic’ : ‘iot-preprocess’

New Value: ‘preprocess_data_topic’ : ‘cisco-network-preprocess’

TML Dag: tml_system_step_4_kafka_preprocess_dag-cybersecuritywithprivategpt-3f10.py

Current Value: ‘identifier’ : ‘IoT device performance and failures’

New Value: ‘identifier’ : ‘Detect potential cyber attacks and monitor network’

TML Dag: tml_system_step_4_kafka_preprocess_dag-cybersecuritywithprivategpt-3f10.py

Current Value: ‘preprocesstypes’ : ‘anomprob,trend,avg’

New Value: ‘preprocesstypes’ : ‘min,max,trend,anomprob,variance,avg’

TML Dag: tml_system_step_4_kafka_preprocess_dag-cybersecuritywithprivategpt-3f10.py

Current Value: ‘jsoncriteria’ : ‘uid=metadata.dsn,filter:allrecords~

subtopics=metadata.property_name~

values=datapoint.value~

identifiers=metadata.display_name~

datetime=datapoint.updated_at~

msgid=datapoint.id~

latlong=lat:long’,

New Value: ‘jsoncriteria’ : ‘uid=hostName,filter:allrecords~

subtopics=hostName,hostName,hostName~

values=inboundpackets,outboundpackets,pingStatus~

identifiers=inboundpackets,outboundpackets,pingStatus~

datetime=lastUpdated~

msgid=~

latlong=’,

TML Dag: tml_system_step_7_kafka_visualization_dag-cybersecuritywithprivategpt-3f10.py

Current Value: ‘topic’ : ‘iot-preprocess,iot-preprocess2’

New Value: ‘topic’ : ‘cisco-network-preprocess,cisco-network-privategpt’

TML Dag: tml_system_step_7_kafka_visualization_dag-cybersecuritywithprivategpt-3f10.py

Current Value: ‘dashboardhtml’ : ‘dashboard.html’

New Value: ‘dashboardhtml’: ‘tml-cisco-network-privategpt-monitor.html’

TML Dag: tml_system_step_9_privategpt_qdrant_dag-cybersecuritywithprivategpt-3f10.py

NOTE: THIS CHANGE IS ONLY FOR USERS WITHOUT ACCESS TO A NVIDIA GPU CARD

Current Value: ‘pgptcontainername’ : ‘maadsdocker/tml-privategpt-with-gpu-nvidia-amd64’,

New Value: ‘pgptcontainername’ : ‘maadsdocker/tml-privategpt-no-gpu-amd64’

TML Dag: tml_system_step_9_privategpt_qdrant_dag-cybersecuritywithprivategpt-3f10.py

Current Value: ‘consumefrom’ : ‘iot-preprocess’,

New Value: ‘consumefrom’ : ‘cisco-network-preprocess’

TML Dag: tml_system_step_9_privategpt_qdrant_dag-cybersecuritywithprivategpt-3f10.py

Current Value: ‘prompt’: ‘Do the device data show any malfunction or defects?’

New Value: ‘prompt’: ‘Do any of the values of the inbound or outbound packets look abnormal?’

TML Dag: tml_system_step_9_privategpt_qdrant_dag-cybersecuritywithprivategpt-3f10.py

Current Value: ‘context’ : ‘This is IoT data from devices. The data are

anomaly probabilities for each IoT device. If voltage or current

probabilities are low, it is likely the device is not working properly.’

New Value: ‘context’ : ‘These data are anomaly probabilities of suspicious data traffic.

A high probability of over 0.80 is likely suspicious.’

TML Dag: tml_system_step_9_privategpt_qdrant_dag-cybersecuritywithprivategpt-3f10.py

Current Value: ‘keyattribute’ : ‘Voltage,current’

New Value: ‘keyattribute’ : ‘outboundpackets,inboundpackets’

9.9.2. DAG Successful Run

_images/gptdash2.png

9.9.3. The Dashboard with PrivateGPT

_images/gptdash.png

9.9.4. The HiveMQ Cluster

_images/mqttcluster.png

9.9.5. Solution Documentation

_images/cyberdoc.png

9.9.6. Solution Docker Container

_images/dockercyber.png