... optimizing your manufacturing processes with modular digital twins composed of expert assets...
CNS use case with Camel route (MQTT-Kafka-PostgresSQL)
Microservice related to Data Processing in the context of Construction Industry
Description:
CNS use case: MQTT-Kafka-PostgresSQL data pipeline realised using Apache Camel Microservice
Classification: Other
Type: Other
# MQTT -> Kafka route
- route:
id: "YAML MQTT->Kafka route"
auto-startup: true
from:
uri: "paho-mqtt5:${MQTT_TOPIC}"
parameters:
brokerUrl: "ssl://${MQTT_HOST}:${MQTT_PORT}"
retained: "true"
steps:
- unmarshal:
gzipDeflater: {}
- convertBodyTo:
type: "String"
charset: "Cp1250"
- split:
jsonpath:
writeAsString: true
expression: "$"
- to:
uri: "kafka:${KAFKA_TOPIC}"
parameters:
brokers: ${KAFKA_HOST}:${KAFKA_PORT}
# Kafka -> PostgreSQL route
- beans:
- name: mydatabase
type: org.apache.commons.dbcp.BasicDataSource
properties:
url: "jdbc:postgresql://${POSTGRESQL_HOST}:${POSTGRESQL_PORT}/${POSTGRESQL_DATABASE}"
driver-class-name: "org.postgresql.Driver"
username: "${POSTGRESQL_USERNAME}"
password: "${POSTGRESQL_PASSWORD}"
- route:
id: "YAML Kafka->PostgreSQL route"
auto-startup: true
from:
uri: "kafka:${KAFKA_TOPIC}"
parameters:
brokers: "${KAFKA_HOST}:${KAFKA_PORT}"
seekTo: "END"
pollOnError: "RECONNECT"
steps:
- set-header:
name: "identifier"
expression:
jsonpath: "$.identifier"
- set-header:
name: "event_timestamp"
expression:
jsonpath: "$.timestamp"
- set-header:
name: "source"
expression:
jsonpath: "$.source"
- set-header:
name: "data"
expression:
jsonpath: "$.data"
- set-header:
name: "config"
expression:
jsonpath: "$.config"
- set-body:
simple: "INSERT INTO data1(identifier, event_timestamp, source, data, config) VALUES ('${header[identifier]}', ${header[event_timestamp]}, '${header[source]}', '${header[data]}', '${header[config]}');"
- to:
uri: "jdbc:mydatabase"
parameters:
allowNamedParameters: True
useHeadersAsParameters: True
#!/bin/bash
echo "Starting Camel Microservice..." #
printenv #
echo "Processing routes, substituting environment variables..." #
cp /route-templates/* /routes/ #
echo "Routes:" #
ls -al /routes/ #
sed -i 's/\${MQTT_HOST}/'${MQTT_HOST}'/g' /routes/mqtt-kafka-route.yaml || true #
sed -i 's/\${MQTT_PORT}/'${MQTT_PORT}'/g' /routes/mqtt-kafka-route.yaml || true #
sed -i 's/\${MQTT_TOPIC}/'${MQTT_TOPIC:1}'/g' /routes/mqtt-kafka-route.yaml || true #
sed -i 's/\${KAFKA_HOST}/'${KAFKA_HOST}'/g' /routes/mqtt-kafka-route.yaml || true #
sed -i 's/\${KAFKA_PORT}/'${KAFKA_PORT}'/g' /routes/mqtt-kafka-route.yaml || true #
sed -i 's/\${KAFKA_TOPIC}/'${KAFKA_TOPIC:1}'/g' /routes/mqtt-kafka-route.yaml || true #
echo "mqtt-kafka-route.yaml: " #
cat /routes/mqtt-kafka-route.yaml #
sed -i 's/\${KAFKA_HOST}/'${KAFKA_HOST}'/g' /routes/kafka-postgres-route.yaml || true#
sed -i 's/\${KAFKA_PORT}/'${KAFKA_PORT}'/g' /routes/kafka-postgres-route.yaml || true #
sed -i 's/\${KAFKA_TOPIC}/'${KAFKA_TOPIC:1}'/g' /routes/kafka-postgres-route.yaml || true #
sed -i 's/\${POSTGRESQL_HOST}/'${POSTGRESQL_HOST}'/g' /routes/kafka-postgres-route.yaml || true #
sed -i 's/\${POSTGRESQL_PORT}/'${POSTGRESQL_PORT}'/g' /routes/kafka-postgres-route.yaml || true #
sed -i 's/\${POSTGRESQL_DATABASE}/'${POSTGRESQL_DATABASE:1}'/g' /routes/kafka-postgres-route.yaml || true #
sed -i 's/\${POSTGRESQL_USERNAME}/'${POSTGRESQL_USERNAME}'/g' /routes/kafka-postgres-route.yaml || true #
sed -i 's/\${POSTGRESQL_PASSWORD}/'${POSTGRESQL_PASSWORD}'/g' /routes/kafka-postgres-route.yaml || true #
echo "kafka-postgres-route.yaml: " #
cat /routes/kafka-postgres-route.yaml #
echo "Java keystore file: " #
echo $MQTT_CLIENT_CERTIFICATE | base64 --decode > /usr/src/keystore.jks #
ls -l /usr/src/keystore.jks #
echo "Starting Apache Camel..." #
java -jar /usr/src/bb-apache-camel-0.0.1.jar #
echo Done #
MQTT data source
Apache Kafka server
PostgreSQL database server
Username for PostgreSQL database
Password for PostgreSQL database
Base64 encoded JKS file containing the client certificate and CA cert to connect to MQTT broker
01. services: 02. camel: 03. command: bash /usr/src/init.sh 04. container_name: camel 05. environment: 06. - MQTT_HOST={{MQTT_BROKER.host}} 07. - MQTT_PORT={{MQTT_BROKER.port}} 08. - MQTT_TOPIC={{MQTT_BROKER.path}} 09. - KAFKA_HOST={{KAFKA_SERVER.host}} 10. - KAFKA_PORT={{KAFKA_SERVER.port}} 11. - KAFKA_TOPIC={{KAFKA_SERVER.path}} 12. - POSTGRESQL_HOST={{POSTGRESQL_SERVER.host}} 13. - POSTGRESQL_PORT={{POSTGRESQL_SERVER.port}} 14. - POSTGRESQL_DATABASE={{POSTGRESQL_SERVER.path}} 15. - POSTGRESQL_USERNAME=${POSTGRESQL_USERNAME} 16. - POSTGRESQL_PASSWORD=${POSTGRESQL_PASSWORD} 17. - MQTT_CLIENT_CERTIFICATE=${MQTT_CLIENT_CERTIFICATE} 18. - JAVA_TOOL_OPTIONS=-Xmx1G -Djavax.net.ssl.trustStore=/usr/src/keystore.jks -Djavax.net.ssl.trustStorePassword=changeit 19. -Djavax.net.ssl.keyStore=/usr/src/keystore.jks -Djavax.net.ssl.keyStorePassword=changeit 20. image: dbs-container-repo.emgora.eu/apache-camel:3.20.5 21. version: '3' 22.