Assets

... optimizing your manufacturing processes with modular digital twins composed of expert assets...

icon

CNS-Camel-MS

CNS use case with Camel route (MQTT-Kafka-PostgresSQL)

Microservice related to Data Processing in the context of Construction Industry

Provided by SZTAKI - Institute for Computer Science and Control 8 months, 2 weeks ago (last modified 8 months, 1 week ago); viewed 225 times and bound 1 time

Description:

CNS use case: MQTT-Kafka-PostgresSQL data pipeline realised using Apache Camel Microservice

Classification: Other

Type: Other

Configuration Files
  • File path: /route-templates/mqtt-kafka-route.yaml
  • File content:

    # MQTT -> Kafka route
    - route:
    id: "YAML MQTT->Kafka route"
    auto-startup: true
    from:
    uri: "paho-mqtt5:${MQTT_TOPIC}"
    parameters:
    brokerUrl: "ssl://${MQTT_HOST}:${MQTT_PORT}"
    retained: "true"
    steps:
    - unmarshal:
    gzipDeflater: {}
    - convertBodyTo:
    type: "String"
    charset: "Cp1250"
    - split:
    jsonpath:
    writeAsString: true
    expression: "$"
    - to:
    uri: "kafka:${KAFKA_TOPIC}"
    parameters:
    brokers: ${KAFKA_HOST}:${KAFKA_PORT}

  • File path: /route-templates/kafka-postgres-route.yaml
  • File content:

    # Kafka -> PostgreSQL route
    - beans:
    - name: mydatabase
    type: org.apache.commons.dbcp.BasicDataSource
    properties:
    url: "jdbc:postgresql://${POSTGRESQL_HOST}:${POSTGRESQL_PORT}/${POSTGRESQL_DATABASE}"
    driver-class-name: "org.postgresql.Driver"
    username: "${POSTGRESQL_USERNAME}"
    password: "${POSTGRESQL_PASSWORD}"
    - route:
    id: "YAML Kafka->PostgreSQL route"
    auto-startup: true
    from:
    uri: "kafka:${KAFKA_TOPIC}"
    parameters:
    brokers: "${KAFKA_HOST}:${KAFKA_PORT}"
    seekTo: "END"
    pollOnError: "RECONNECT"
    steps:
    - set-header:
    name: "identifier"
    expression:
    jsonpath: "$.identifier"
    - set-header:
    name: "event_timestamp"
    expression:
    jsonpath: "$.timestamp"
    - set-header:
    name: "source"
    expression:
    jsonpath: "$.source"
    - set-header:
    name: "data"
    expression:
    jsonpath: "$.data"
    - set-header:
    name: "config"
    expression:
    jsonpath: "$.config"
    - set-body:
    simple: "INSERT INTO data1(identifier, event_timestamp, source, data, config) VALUES ('${header[identifier]}', ${header[event_timestamp]}, '${header[source]}', '${header[data]}', '${header[config]}');"
    - to:
    uri: "jdbc:mydatabase"
    parameters:
    allowNamedParameters: True
    useHeadersAsParameters: True

  • File path: /usr/src/init.sh
  • File content:

    #!/bin/bash
    echo "Starting Camel Microservice..." #
    printenv #
    echo "Processing routes, substituting environment variables..." #
    cp /route-templates/* /routes/ #
    echo "Routes:" #
    ls -al /routes/ #
    sed -i 's/\${MQTT_HOST}/'${MQTT_HOST}'/g' /routes/mqtt-kafka-route.yaml || true #
    sed -i 's/\${MQTT_PORT}/'${MQTT_PORT}'/g' /routes/mqtt-kafka-route.yaml || true #
    sed -i 's/\${MQTT_TOPIC}/'${MQTT_TOPIC:1}'/g' /routes/mqtt-kafka-route.yaml || true #
    sed -i 's/\${KAFKA_HOST}/'${KAFKA_HOST}'/g' /routes/mqtt-kafka-route.yaml || true #
    sed -i 's/\${KAFKA_PORT}/'${KAFKA_PORT}'/g' /routes/mqtt-kafka-route.yaml || true #
    sed -i 's/\${KAFKA_TOPIC}/'${KAFKA_TOPIC:1}'/g' /routes/mqtt-kafka-route.yaml || true #
    echo "mqtt-kafka-route.yaml: " #
    cat /routes/mqtt-kafka-route.yaml #
    sed -i 's/\${KAFKA_HOST}/'${KAFKA_HOST}'/g' /routes/kafka-postgres-route.yaml || true#
    sed -i 's/\${KAFKA_PORT}/'${KAFKA_PORT}'/g' /routes/kafka-postgres-route.yaml || true #
    sed -i 's/\${KAFKA_TOPIC}/'${KAFKA_TOPIC:1}'/g' /routes/kafka-postgres-route.yaml || true #
    sed -i 's/\${POSTGRESQL_HOST}/'${POSTGRESQL_HOST}'/g' /routes/kafka-postgres-route.yaml || true #
    sed -i 's/\${POSTGRESQL_PORT}/'${POSTGRESQL_PORT}'/g' /routes/kafka-postgres-route.yaml || true #
    sed -i 's/\${POSTGRESQL_DATABASE}/'${POSTGRESQL_DATABASE:1}'/g' /routes/kafka-postgres-route.yaml || true #
    sed -i 's/\${POSTGRESQL_USERNAME}/'${POSTGRESQL_USERNAME}'/g' /routes/kafka-postgres-route.yaml || true #
    sed -i 's/\${POSTGRESQL_PASSWORD}/'${POSTGRESQL_PASSWORD}'/g' /routes/kafka-postgres-route.yaml || true #
    echo "kafka-postgres-route.yaml: " #
    cat /routes/kafka-postgres-route.yaml #
    echo "Java keystore file: " #
    echo $MQTT_CLIENT_CERTIFICATE | base64 --decode > /usr/src/keystore.jks #
    ls -l /usr/src/keystore.jks #
    echo "Starting Apache Camel..." #
    java -jar /usr/src/bb-apache-camel-0.0.1.jar #
    echo Done #

Expected Data Sources
MQTT_BROKER

MQTT data source

  • Kind: STREAM
  • Direction: SOURCE
  • Authorization type: tls_client_certificate
KAFKA_SERVER

Apache Kafka server

  • Kind: STREAM
  • Direction: BIDIRECTIONAL
  • Authorization type: none
POSTGRESQL_SERVER

PostgreSQL database server

  • Kind: DATABASE
  • Direction: SINK
  • Authorization type: username_password
Expected User's Input Parameters
POSTGRESQL_USERNAME

Username for PostgreSQL database

  • Type: text
  • Mandatory: Yes
POSTGRESQL_PASSWORD

Password for PostgreSQL database

  • Type: text
  • Mandatory: Yes
MQTT_CLIENT_CERTIFICATE

Base64 encoded JKS file containing the client certificate and CA cert to connect to MQTT broker

  • Type: text
  • Mandatory: Yes
Container Deployment Information
  • Format: docker-compose
  • Data:
    01. services:
    02.   camel:
    03.     command: bash /usr/src/init.sh
    04.     container_name: camel
    05.     environment:
    06.     - MQTT_HOST={{MQTT_BROKER.host}}
    07.     - MQTT_PORT={{MQTT_BROKER.port}}
    08.     - MQTT_TOPIC={{MQTT_BROKER.path}}
    09.     - KAFKA_HOST={{KAFKA_SERVER.host}}
    10.     - KAFKA_PORT={{KAFKA_SERVER.port}}
    11.     - KAFKA_TOPIC={{KAFKA_SERVER.path}}
    12.     - POSTGRESQL_HOST={{POSTGRESQL_SERVER.host}}
    13.     - POSTGRESQL_PORT={{POSTGRESQL_SERVER.port}}
    14.     - POSTGRESQL_DATABASE={{POSTGRESQL_SERVER.path}}
    15.     - POSTGRESQL_USERNAME=${POSTGRESQL_USERNAME}
    16.     - POSTGRESQL_PASSWORD=${POSTGRESQL_PASSWORD}
    17.     - MQTT_CLIENT_CERTIFICATE=${MQTT_CLIENT_CERTIFICATE}
    18.     - JAVA_TOOL_OPTIONS=-Xmx1G -Djavax.net.ssl.trustStore=/usr/src/keystore.jks -Djavax.net.ssl.trustStorePassword=changeit
    19.       -Djavax.net.ssl.keyStore=/usr/src/keystore.jks -Djavax.net.ssl.keyStorePassword=changeit
    20.     image: dbs-container-repo.emgora.eu/apache-camel:3.20.5
    21. version: '3'
    22. 
  • GUI Microservice: No
  • Workload type: service
  • Opened ports: