Connecter PLCnext Control via MQTT à Apache Kafka
Contexte technique
Kafka
Apache Kafka est un framework pour l'ingestion, le stockage, le traitement et la redistribution des données. De nos jours, il est largement déployé dans des entreprises du monde entier. Le site officiel de Kafka offre plus d'informations sur son idée et comment la déployer. L'une de ses principales caractéristiques est le grand nombre de connecteurs déjà existants vers d'autres applications et protocoles de communication comme MQTT.
MQTT
MQTT est un protocole de messagerie léger basé sur TCP, souvent utilisé pour la communication IoT en raison de sa robustesse et de son faible encombrement. Des détails sur la norme OASIS MQTT peuvent être trouvés sur son site Web.
Vous trouverez ici un article du Makers Blog sur la compilation croisée de Mosquitto pour PLCnext, une implémentation MQTT d'Eclipse. Alternativement, le PLCnext Store propose des applications MQTT prêtes.
Exigences
- Client MQTT sur le PLCnext (voir la section précédente pour les conseils d'implémentation)
- le contrôleur est connecté à un PC/VM
- Intermédiaire MQTT sur le PC/VM (par exemple, moustique)
- Instance Kafka sur le PC/VM (voir le guide de démarrage rapide de Kafka)
Configuration
L'image suivante montre un aperçu de la configuration que nous allons implémenter pour ingérer les données du contrôle PLCnext vers Kafka. Bien qu'il soit possible d'utiliser le proxy MQTT de Confluent pour leur version de Kafka (2), nous nous concentrerons sur la solution plus générique (1). Il comprend un courtier MQTT où le client se connecte et publie des messages et un connecteur qui s'abonne à un sujet chez le courtier, traite les messages et les transmet à Kafka.
Création du connecteur
Dans ce tutoriel, notre connecteur se base sur le référentiel evokly/kafka-connect-mqtt de GitHub, sous licence MIT (informations détaillées sur la licence). Tout d'abord, nous téléchargeons et extrayons le référentiel. Étant donné que la dernière version du référentiel date de fin 2016, nous mettons à jour le build.gradle
fichier, en remplaçant les anciennes dépendances par leurs nouvelles versions :
ext { kafkaVersion = '2.6.0' }
...
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.13'
compile "org.apache.kafka:connect-api:$kafkaVersion"
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
compile 'org.bouncycastle:bcprov-jdk15on:1.67'
compile 'org.bouncycastle:bcpkix-jdk15on:1.67'
compile 'org.bouncycastle:bcpg-jdk15on:1.67'
compile 'commons-io:commons-io:2.8.0'
compile 'org.slf4j:slf4j-api:1.7.30'
testCompile 'org.slf4j:slf4j-simple:1.7.30'
}
Dans cet exemple, nous enverrons des messages String simples à Kafka. Nous devons donc éditer la classe Java DumbProcessor.java
dans le dossier /kafka-connect-mqtt-master/src/main/java/com/evokly/kafka/connect/mqtt
, qui est le processeur de message par défaut :
@Override
public SourceRecord[] getRecords(String kafkaTopic) {
return new SourceRecord[]{new SourceRecord(null, //sourcePartition
null, //sourceOffset
kafkaTopic, //topic
null, //partition
null, //keySchema
mTopic, //key
null, //valueSchema
mMessage.toString(), //value
new Long(123L))}; //long timestamp
}
Par la suite, nous construisons un Java Archive File (JAR) qui contient les dépendances :./gradlew clean jar
. Nous copions la sortie JAR kafka-connect-mqtt-1.1-SNAPSHOT.jar
qui se trouve dans le dossier /kafka-connect-mqtt-master/build/libs
au libs
répertoire de Kafka.
Nous avons également besoin d'une copie de l'archive org.eclipse.paho.client.mqttv3-1.2.5.jar dans le répertoire libs de Kafka. Nous pouvons le télécharger ici.
De plus, nous devons créer un fichier de configuration pour le connecteur mqtt.properties
dans le config
de Kafka dossier. Le fichier a le contenu suivant :
name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1
# converters for plain String messages without schemas
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
kafka.topic=test_in # Kafka destination topic for the MQTT messages
mqtt.client_id=mqtt-kafka-123
mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60
mqtt.server_uris=tcp://172.17.0.1:1883 # address of the MQTT broker
mqtt.topic=test/# # MQTT topic where the messages should be collected
#if we want to use our own processor class
#message_processor_class=com.evokly.kafka.connect.mqtt.sample.OwnProcessor
Test local
Nous pouvons maintenant tester notre connecteur localement. Accédez au répertoire de Kafka et démarrez une instance ZooKeeper et Broker :
# start ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# start Kafka:
bin/kafka-server-start.sh config/server.properties
# start an MQTT-Broker (here a mosquitto docker container)
sudo docker run -d --name mosquitto -p 1883:1883 eclipse-mosquitto
# start the MQTT-Kafka connector
bin/connect-standalone.sh config/connect-standalone.properties config/mqtt.properties
# start a Kafka console consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_in --from-beginning --property print.value=true --property print.timestamp=true
# publish an MQTT message
mosquitto_pub -h 172.17.0.1 -p 1883 -t test/1 -m test123
Le message s'affiche dans le consommateur de la console.
Technologie industrielle
- Circuits de commande de moteur
- Circuits de contrôle
- 5 avantages du contrôle de production à distance
- Contrôle de l'impédance des vias et son influence sur l'intégrité du signal dans la conception de circuits imprimés
- Gestion d'un équipement PLCnext Control via SNMP
- Gestion de cluster sur PLCnext ?
- Tableau de bord PLCnext
- Rapports PLCnext Power BI
- Application Java sur PLCnext Control