gRPC Python - Lire et écrire des données de processus
Cet article décrit comment accéder et écrire des données de processus simples avec Python avec un AXC F 3152 utilisant gRPC. (https://www.plcnext.help/te/Service_Components/gRPC_Introduction.htm)
Prérequis
Nous devons d'abord préparer les fichiers requis, en dehors de l'automate, par exemple sur une machine Windows.
- Installez Python 3.9 (3.10 peut provoquer des erreurs)
- Installez le package Python requis pour générer du code à partir des fichiers .proto :
pip install grpcio-tools==1.36.1
- Téléchargez et décompressez le référentiel contenant les fichiers .proto depuis https://github.com/PLCnext/gRPC
Générer _pb2.py et _pb2_grpc.py à partir des fichiers .proto
Ensuite, nous devons générer les fichiers python requis à partir des fichiers .proto fournis. Ces derniers se trouvent dans le dossier suivant :gRPC-master/protobuf.
Utilisez ce code pour créer un script Python dans le dossier gRPC-master, par exemple, generate_grpc.py. Le scénario
- génère les fichiers requis et les place dans gRPC-master/pxc_grpc
- adapter les chemins d'importation
import glob
import os
from pathlib import Path
# create the output directory
Path('pxc_grpc').mkdir(parents=True, exist_ok=True)
grpc_command_base = 'python -m grpc_tools.protoc -I./protobuf --python_out=pxc_grpc --grpc_python_out=pxc_grpc '
import_paths = set()
# generate the *_pb2.py and *_pb2_grpc.py files
for filename in glob.iglob('./protobuf/**', recursive=True):
if filename.endswith('.proto'):
# store the import path
path_parts = filename.split(os.sep)
import_paths.add('.'.join(path_parts[1:-1]))
grpc_command = ''.join([grpc_command_base, os.path.join('.', os.path.relpath(filename))])
stream = os.popen(grpc_command)
output = stream.read()
if output != '':
print(''.join(['error/info for file ', os.path.relpath(filename), ' - ', output]))
# get the python files in the base directory
base_pys = set()
for (dirpath, dirnames, filenames) in os.walk('./pxc_grpc'):
for f in filenames:
base_pys.add(f.split('.py')[0])
break
# reformat the stored paths to adapt the import statements
try:
import_paths.remove('')
except:
pass
import_paths = list(import_paths)
import_paths.sort(key=len)
import_paths.reverse()
# adapt the imports
for filename in glob.iglob('./pxc_grpc/**', recursive=True):
if filename.endswith('.py'):
new_lines = []
with open(filename, 'r') as file:
lines = file.readlines()
for line in lines:
if line.startswith('from'):
for import_path in import_paths:
if import_path in line:
line = line.replace(import_path, ''.join(['pxc_grpc.', import_path]), 1)
break
elif line.startswith('import'):
parts = line.split()
if parts[1] in base_pys:
line = line.replace('import', 'from pxc_grpc import')
new_lines.append(line)
with open(filename, 'w') as file:
file.write(''.join(new_lines))
Ouvrez un shell et exécutez le script :pyton generate_grpc.py
Créer un projet de démonstration PLCnext
Le projet affiché doit uniquement montrer comment l'interface gRPC interagit avec le GDS. N'hésitez pas à utiliser un projet existant à la place. Pour un projet individuel, vous devez modifier les noms de port dans le script Python suivant en conséquence, par exemple, Arp.Plc.Eclr/MainInstance.strInput
.
Préparer l'automate
Installez pip pour gérer vos packages Python :
- Connectez le contrôleur AXC F 3152 à Internet.
- Entrez la commande curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py.
- Entrez ensuite la commande python3 get-pip.py.
Installez les packages requis :pip install grpcio protobuf==3.20.0
Créez un dossier 'grpc_test' dans le répertoire des projets (/opt/plcnext/projects/).
Copiez le dossier 'pxc_grpc', contenant les fichiers Python créés dans 'grpc_test', par exemple, avec WinSCP.
Créez un script Python dans le dossier 'grpc_test' nommé 'grpc_test.py' et insérez le code suivant :
import grpc
from pxc_grpc.Plc.Gds.IDataAccessService_pb2 import IDataAccessServiceReadSingleRequest, \
IDataAccessServiceReadRequest, IDataAccessServiceWriteSingleRequest, IDataAccessServiceWriteRequest
from pxc_grpc.Plc.Gds.IDataAccessService_pb2_grpc import IDataAccessServiceStub
from pxc_grpc.Plc.Gds.WriteItem_pb2 import WriteItem
def write_single_string(stub, port_name, value):
single_write_request = IDataAccessServiceWriteSingleRequest()
single_write_request.data.PortName = port_name
single_write_request.data.Value.TypeCode = 19
single_write_request.data.Value.StringValue = value
return stub.WriteSingle(single_write_request)
def write_single_int(stub, port_name, value):
single_write_request = IDataAccessServiceWriteSingleRequest()
single_write_request.data.PortName = port_name
single_write_request.data.Value.TypeCode = 6
single_write_request.data.Value.Int16Value = value
return stub.WriteSingle(single_write_request)
def write_multiple_values(stub):
write_request = IDataAccessServiceWriteRequest()
wi1 = WriteItem()
wi1.PortName = 'Arp.Plc.Eclr/MainInstance.strInput'
wi1.Value.StringValue = "test1"
wi1.Value.TypeCode = 19
wi2 = WriteItem()
wi2.PortName = 'Arp.Plc.Eclr/MainInstance.strInput2'
wi2.Value.StringValue = "test2"
wi2.Value.TypeCode = 19
# add multiple WriteItems at once
write_request.data.extend([wi1, wi2])
# add WriteItems separately
# response1.data.append(wi1)
# response1.data.append(wi2)
return stub.Write(write_request)
def read_single_value(stub, port_name):
single_read_request = IDataAccessServiceReadSingleRequest()
single_read_request.portName=port_name
return stub.ReadSingle(single_read_request)
def read_multiple_values(stub, port_names):
read_request = IDataAccessServiceReadRequest()
read_request.portNames.extend(port_names)
return stub.Read(read_request)
if __name__ == "__main__":
# create channel and stub
channel = grpc.insecure_channel('unix:/run/plcnext/grpc.sock')
stub = IDataAccessServiceStub(channel)
print(write_single_string(stub, 'Arp.Plc.Eclr/MainInstance.strInput', 'test123'))
print(write_single_int(stub, 'Arp.Plc.Eclr/MainInstance.iInput', 18))
print(write_multiple_values(stub))
r = read_single_value(stub, 'Arp.Plc.Eclr/MainInstance.strInput')
print(r)
print(r._ReturnValue.Value.TypeCode)
print(r._ReturnValue.Value.StringValue)
r = read_multiple_values(stub, ['Arp.Plc.Eclr/MainInstance.iInput', 'Arp.Plc.Eclr/MainInstance.strInput'])
for value in r._ReturnValue:
print(value, value.Value.TypeCode)
Connectez votre automate à PLCnext Engineer, téléchargez le projet et démarrez la vue en direct.
Exécuter l'exemple
Commencez maintenant l'exemple. Connectez-vous à l'automate via ssh et accédez à 'grpc_test', puis démarrez le script Python :
cd projects/grpc_test/
python3 grpc_test.py
Le gRPC permet l'interaction avec les variables GDS.
Types de données
Pour lire et écrire des variables, le type de données est requis, par exemple, wi1.Value.TypeCode = 19
. Les types sont décrits dans le fichier généré gRPC-master/pxc_grpc/ArpTypes_pb2.py
commençant à la ligne 242 :
CT_None = 0
CT_End = 0
CT_Void = 1
CT_Boolean = 2
CT_Char = 3
CT_Int8 = 4
CT_Uint8 = 5
CT_Int16 = 6
CT_Uint16 = 7
CT_Int32 = 8
CT_Uint32 = 9
CT_Int64 = 10
CT_Uint64 = 11
CT_Real32 = 12
CT_Real64 = 13
CT_Struct = 18
CT_String = 19
CT_Utf8String = 19
CT_Array = 20
CT_DateTime = 23
CT_Version = 24
CT_Guid = 25
CT_AnsiString = 26
CT_Object = 28
CT_Utf16String = 30
CT_Stream = 34
CT_Enumerator = 35
CT_SecureString = 36
CT_Enum = 37
CT_Dictionary = 38
CT_SecurityToken = 39
CT_Exception = 40
CT_IecTime = 41
CT_IecTime64 = 42
CT_IecDate = 43
CT_IecDate64 = 44
CT_IecDateTime = 45
CT_IecDateTime64 = 46
CT_IecTimeOfDay = 47
CT_IecTimeOfDay64 = 48
Les variables de valeur correspondantes, par exemple, r._ReturnValue.Value.StringValue
, peut se trouver dans le même fichier, en commençant à la ligne 365, par exemple, BoolValue
, Int8Value
, StringValue
.
Technologie industrielle
- Apacer :lancement des cartes CV110-SD et CV110-MSD dans le monde
- Apacer :série de SSD SV250 de qualité industrielle avec des vitesses de lecture/écriture de 560 et 520 Mo/s
- Comment donner du sens au Big Data :RTU et applications de contrôle de processus
- Qu'est-ce que le fraisage ? - Définition, processus et opérations
- Qu'est-ce que le forage ? - Définition, processus et conseils
- Qu'est-ce que la métallurgie des poudres ? - Définition et processus
- Qu'est-ce que le brochage ? - Processus, travail et types
- Qu'est-ce que l'usinage chimique ? - Travail et processus
- Qu'est-ce que l'usinage par ultrasons ? - Travail et processus