Dr. Arne JachensDr. Arne Jachens

zigbee

mosquitto mqtt broker

First, you need to have a MQTT broker installed:
  1. sudo apt-get install mosquitto
    sudo apt-get install mosquitto-clients
  2. Check the configuration, default port is 1883:
    sudo nano /etc/mosquitto/mosquitto.conf
  3. To test mqtt manually, execute in multiple terminals:
    1. sudo systemctl stop mosquitto.service
      mosquitto -v

      To observe mqtt in verbose mode.
    2. mosquitto_sub -h localhost -t test -d
      Subscribe to topic "test"
    3. mosquitto_pub -h localhost -t test -m message
      publish a message to the test topic

  4. Runs as a service:
    sudo systemctl start mosquitto.service
  5. To automatically start the mosquitto at boot time, do:
    sudo systemctl enable mosquitto.service
    sudo systemctl start mosquitto.service
    sudo systemctl status mosquitto.service

zigbee2mqtt

I followed the zigbee2mqtt/installation:

  1. Identify your device:
    ls -l /dev/serial/by-id/
    usb-ITead_Sonoff_Zigbee_3.0_USB_Dongle_Plus_4e233f7bf112ec119ba823c7bd930c07-if00-port0 -> ../../ttyUSB0
    ls -l /dev/ttyUSB0
    crw-rw---- 1 root dialout 188, 0 May  8 12:33 /dev/ttyUSB0
  2. Install zigbee2mqtt:
    sudo apt-get install -y nodejs npm git make g++ gcc
    git clone --branch 1.27.2 https://github.com/Koenkk/zigbee2mqtt.git
    sudo mv zigbee2mqtt /opt/zigbee2mqtt
    cd /opt/zigbee2mqtt
    npm ci
  3. Edit the configuration, the port number needs to be different to your standard web server (80):
    nano /opt/zigbee2mqtt/data/configuration.yaml
    # allow new devices to join
    permit_join: true
    
    mqtt:
      base_topic: zigbee2mqtt
      server: "mqtt://localhost"
      #user: EnergyManager
      #password: geheim
    
    serial:
      port: /dev/ttyUSB0
    
    frontend:
      port: 65090
      host: 0.0.0.0
    
  4. Start zigbee2mqtt:
    cd /opt/zigbee2mqtt && npm start

    If everything works fine, you should see your GUI in your browser:
    http://localhost:65090
  5. Create a deamon to automatically run zigbee:
    sudo nano /etc/systemd/system/zigbee2mqtt.service
    [Unit]
    Description=zigbee2mqtt
    After=network.target
    
    [Service]
    ExecStart=/usr/bin/npm start
    WorkingDirectory=/opt/zigbee2mqtt
    StandardOutput=inherit
    StandardError=inherit
    Restart=always
    RestartSec=10s
    User=pi_orAnyOther
    
    [Install]
    WantedBy=multi-user.target
    
    sudo systemctl start zigbee2mqtt
    systemctl status zigbee2mqtt.service
  6. To run it automatically after boot:
    sudo systemctl enable zigbee2mqtt.service
    After reboot stay patient for 1 min to have zigbee operational.

Aquara temperature & humidity sensor

  1. Open the GUI at http://localhost:65090 ,
    the list of devices will be empty on first hand, but shortly after press of the tiny button on top of the Aqara Temperature and humidity sensor, it will connect in multiple iterations and the initial values will converge to realistic ones...
  2. Once the device is coupled, it will show up with its "friendly name", something like 0x00158d000800d277 and if you click on this device and change to tab "Exposes", you will see that it offers i.e. the signal temperature .
    You may set the "friendly name" to something realy matching your preferences.
  3. Given that mosquitto service is running, subscribe to the sensor:
    mosquitto_sub -h localhost -t zigbee2mqtt/0x00158d000800d277
    you receive a SUBACK, but no temperature value for now. Either you need to stay patient, or you press the button on top of the sensor and receive:
    {"battery":100,"humidity":47.4,"linkquality":127,"pressure":1001,"temperature":26.24,"voltage":3175}

Python interface

import  os.path
import  queue
from paho.mqtt import  client as  mqtt_client
import  json
import  time
import  threading

q = queue.Queue() #queue for  data exchange

mqtt_connect=False
def  on_connect(client, userdata, flags, rc):
    if rc == 0:
        if True: #debug
            print("Connection to broker successfull")
        global mqtt_connect
        mqtt_connect = True   
    else:
        print("Connection to broker failed")
    
def  on_message(client, userdata, message):
    """
    Callback function for  MQTT client,
    feeds all received messages into the queue.
    """
    global q #queue for  data exchange
    if False:
    #if True: #debug
        print("message received " ,str(message.payload.decode("utf-8")))
        print("message topic=",message.topic)
        print("message qos=",message.qos)
        print("message retain flag=",message.retain)

    q.put(message)
    

def  fillQueue():
    """
    For testing purpose only,
    fill the queue by some meaningfull values
    """
    global q
    msg = {}
    now = datetime.now()
    msg["today"]   = now.strftime("%Y-%m-%d")
    msg["hour"]    = now.strftime("%H")
    msg["minutes"] = now.strftime("%M")
    msg["seconds"] = now.strftime("%S")
    q.put(msg)


    
class HvcMqtt:
    def  __init__(self, debug=False):
        """
        Initialize and configure the MQTT client,
        put all sensors into the dictionary.
        """
        global q
        self.debug = debug
        self.logPath = "./LogDir/"
        #self.q = queue.Queue()
        self.broker ="localhost" #alternatively use IP = 192.168.178.39
        self.port = 1883
        self.topic = "zigbee2mqtt/"
        #compose list of sensors here:
        self.sensor = {}
        self.sensor["S01"] = "S01"
        self.sensor["S02"] = "S02"
        self.sensor["S03"] = "S03"
        self.sensor["S04"] = "S04"
        self.sensor["S05"] = "S05"
        self.sensor["S06"] = "S06"
        self.sensor["S07"] = "S07"
        self.sensor["S08"] = "S08"
        self.sensor["S09"] = "S09"
        self.sensor["S10"] = "S10"
        
        self.vars = ["T", "phi", "p"]
        for  v in self.vars:
            self.actual[v] = {}
            self.last[v] = {}
            for  s in self.sensor:
                self.actual[v][s]=-999
                self.last[v][s]=-999
                self.state[s]=-1
     



    def  subscribe2MQTT(self):
        """
        Create MQTT client  
        and subscribe it to sensorIDs
        """
        global mqtt_connect
        mqtt_topic = []
        mqtt_topic.append( ("zigbee2mqtt/bridge/request/networkmap",0) ) #read Zigbee map
        for  i,s in enumerate(self.sensor):
            thisTopic = self.topic + self.sensor[s]
            mqtt_topic.append( (thisTopic,0) ) #QOS=0

        if self.debug:
            print("creating new instance")
        client = mqtt_client.Client("TemperatureSensors")
        #attach function to callback
        client.on_connect= on_connect
        #attach function to callback
        client.on_message= on_message  
        if self.debug:
            print("connecting to broker:", self.broker)                    
        client.connect(self.broker, self.port)
        client.loop_start()
        #Wait for  connection
        while mqtt_connect != True: 
            if self.debug:
                print("waiting for  mqtt ....")
            time.sleep(0.1)

        if self.debug:
            print("Subscribing to sensor topics", mqtt_topic)
        client.subscribe(mqtt_topic)

    
    def  initialize(self):
        """
        create MQTT clients, subscribe to sensors, keep listening
        """
        if self.debug:
            print("initialize MQTT listener")
            
        worker = threading.Thread( target=self.subscribe2MQTT() )
        worker.setDaemon(True)
        worker.start()
        
        return
    
    def  genZigbeeMap(self):
        client = mqtt_client.Client("ZigbeeMap")                   
        client.connect(self.broker, self.port)
        #mqtt_topic = "zigbee2mqtt/bridge/networkmap/graphviz"
        mqtt_topic = "zigbee2mqtt/bridge/request/networkmap"
        message = "{'type': 'graphviz', 'routes': False}"
        # -C 1 >${file}routes.dot &

        client.publish(mqtt_topic,message)
        
        
    #receive and process sensor values
    def  processQueue(self):
        """
        Process values in queue,
        message.topic indicates the sensor received
        message.payloud is binary and needs to be decoded.
        The resulting string is a json object,
        that needs to be converted to dictionary
        """
        global q
        while not q.empty():
            message = q.get()
            if message is None:
                continue
            
            try:
                topic = message.topic
                msgStr = message.payload.decode("utf-8")
                data  = json.loads(msgStr)
                sensorID = topic[topic.find("/")+1:]
                if self.debug:
                    print("data received from queue:")
                    print(sensorID)
                    print("data",data)
                    
                for  s in self.sensor:
                    if self.sensor[s] == str(sensorID):
                        thisSensor = s

                #update values for  thisSensor
                self.actual["T"][thisSensor]   = data["temperature"]
                self.actual["p"][thisSensor]   = data["pressure"]
                self.actual["phi"][thisSensor] = data["humidity"]
                self.state[thisSensor]         = data["battery"]
            except:
                if self.debug:
                    print("message from queue:",message)
                continue
        
        return self.actual, self.state

    
    def  writeLog(self, today, hour, minute, epsilon=1E-3):
        """
        On change, append values to logfiles.
        Since different variables do not change synchronously, split logfiles to type of variables,
        in case of pressure, the mean of all sensors might be sufficient?
        """    
        #check for  updated values
        anyChange = False
        change = {}
        for  v in self.vars:
            change[v] = False
            for  s in self.sensor:
                evalChange = (self.actual[v][s]-self.last[v][s]) / self.last[v][s]
                if abs(evalChange) > epsilon:
                    change[v] = True
                    self.last[v][s] = self.actual[v][s]
            if self.debug:
                print(v,"changed?\t",change[v])

        #in case of change, write to log
        file = self.logPath + today
        for  v in self.vars:
            if change[v]:
                anyChange = True
                thisFile = file + "_" + v + ".dat"
                if os.path.exists(thisFile):
                    #append to file
                    fp = open(thisFile, 'a', encoding='utf-8')
                else:
                    #create file with header
                    fp = open(thisFile, 'w', encoding='utf-8')
                    myStr = " #"
                    for  s in self.sensor:
                        myStr = myStr +  "\t" + s
                    fp.write(myStr+"\n")
                    
                myStr = hour +":"+ minute
                for  s in self.sensor:
                    myStr = myStr +  "\t" + '{0:5.2f}'.format(self.actual[v][s])

                fp.write(myStr+"\n")
                fp.close()
                
        return anyChange
               
 
#persistant data structures
HvcMqtt.actual = {}
HvcMqtt.last   = {}
HvcMqtt.state  = {}



if __name__ == "__main__":
    """
    Testing the threads, queue and MQTT.
    Put some equivalent to you main programm.
    """
    from datetime import   datetime

    
    myMqtt = HvcMqtt(True)
    myMqtt.initialize()
    #subscribe to Zigbee map
    #threading.Thread(
    #myMqtt.readZigbeeMap() 
    #generate Zigbee map
    print("generate Zigbee map")
    myMqtt.genZigbeeMap()
    #exit()
    
    def  secondContainer():
        
        #fillQueue() #for  testing purpose only:
        return

    def  minuteContainer(myMqtt):
        now    = datetime.now()
        today  = now.strftime("%Y-%m-%d")
        hour   = now.strftime("%H")
        minute = now.strftime("%M")
        vars, state = myMqtt.processQueue()
        #safe to logfile or control something...
        myMqtt.writeLog(today, hour, minute)
        print(str(hour)+":"+str(minute), vars["T"], vars["phi"], vars["p"], state)
        return

    secCount = 0
    def  mainLoop():
        global secCount
        secCount= secCount+1
        #call itself to loop forever, each 1 second
        threading.Timer(1, mainLoop).start()
        secondContainer()
        if secCount >= 60: # each 60 sec
            secCount = 0
            minuteContainer(myMqtt)

    mainLoop() 

Trouble Shooting

AquaraTemperatureSensor_getsStuck.png Initially, I put all 10 Aqara Temperature and humidity sensors on a table to test the sensor signals behave the same.
Then, to tidy up, I put them all in a box. Having them this close, I observed, that one sensor after the other got frozen.
Likewise, if sensors get stuck, check their signal quality and ensure to have 2 digit values by re-locating them.
If no data was received from some sensor, it may be marked "offline" by the broker. If this happens, enable "Permit join (All)" and push the tiny button on the sensor to wake it up and let it re-join your network.