1. #!/usr/bin/env /opt/rh/rh-python36/root/usr/bin/python3
  2. import paho.mqtt.client as mqtt
  3. import datetime
  4. import time
  5. from influxdb import InfluxDBClient
  6. def on_connect(client, userdata, flags, rc):
  7. print("Connected with result code "+str(rc))
  8. client.subscribe("home/#")
  9. def on_message(client, userdata, msg):
  10. print("Received a message on topic: " + msg.topic)
  11. # Use utc as timestamp
  12. receiveTime=datetime.datetime.utcnow()
  13. message=msg.payload.decode("utf-8")
  14. isfloatValue=False
  15. try:
  16. # Convert the string to a float so that it is stored as a number and not a string in the database
  17. val = float(message)
  18. isfloatValue=True
  19. except:
  20. print("Could not convert " + message + " to a float value")
  21. isfloatValue=False
  22. if isfloatValue:
  23. print(str(receiveTime) + ": " + msg.topic + " " + str(val))
  24. json_body = [
  25. {
  26. "measurement": msg.topic,
  27. "time": receiveTime,
  28. "fields": {
  29. "value": val
  30. }
  31. }
  32. ]
  33. dbclient.write_points(json_body)
  34. print("Finished writing to InfluxDB")
  35. # Set up a client for InfluxDB
  36. dbclient = InfluxDBClient('localhost', 8086, 'root', 'dusnikeair7', 'home')
  37. # Initialize the MQTT client that should connect to the Mosquitto broker
  38. client = mqtt.Client()
  39. client.username_pw_set("mqtt", "nikeair7")
  40. client.on_connect = on_connect
  41. client.on_message = on_message
  42. connOK=False
  43. while(connOK == False):
  44. try:
  45. client.connect("localhost", 1883, 60)
  46. connOK = True
  47. except:
  48. connOK = False
  49. time.sleep(2)
  50. # Blocking loop to the Mosquitto broker
  51. client.loop_forever()