A IIOT application for traditional factory
Not usable as it is a work in progress
Simple Connection
from opcua import Client, ua
client = Client("opc.tcp://localhost:4840")
try:
client.connect()
print("OPCUA connection success!")
finally:
client.disconnect()
Reading node
from opcua import Client, ua
client = Client("opc.tcp://localhost:4840")
try:
client.connect()
node = client.get_node('node_id')
while True:
print(node.get_value())
sleep(1)
finally:
client.disconnect()
Reading holding registers from address 0 to 124 at port 502.
from pymodbus.client.sync import ModbusTcpClient
client = ModbusTcpClient('localhost', port=502)
print(client.read_holding_registers(address=0, count=124, unit=1).registers)
client.close()
Publish message at port 1883
from time import sleep
import paho.mqtt.publish as publish
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("rc: "+str(rc))
def on_publish(client, userdata, mid):
print("mid: "+str(mid))
client = mqtt.Client()
client.on_connect = on_connect
client.on_publish = on_publish
client.connect(host='test.mosquitto.org', port=1883, keepalive=60)
client.loop_start()
while True:
client.publish('test/123', 'message', qos=2)
sleep(1)
@app.route('/')
def hello_world():
return 'Hello, World!'
Index data to ES
from datetime import datetime
from elasticsearch import Elasticsearch
msg = {
"workingcode": "yaypython!",
"string": "abcABC",
"whole": 123,
"float": 654.321,
"timestamp": datetime.now(),
}
es = Elasticsearch()
es.indices.create(index='python-helloworld', ignore=400)
es.index(index="python-helloworld",
doc_type="test-jsontype",
id=42,
body=msg)
getting data
es.get(index="python-helloworld", doc_type="test-jsontype", id=42)
Using bulk action
import json
from time import sleep
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch()
dict_data = {}
while True:
# update dict_data
actions = [
{
"_index": "python_data",
"_type": "data",
"_source": dict_data
}]
helpers.bulk(es, actions)
- IOT homepage: app status