umqtt - MQTT Protocol

This feature is used to create MQTT clients to publish and subscribe to topics.

QoS Level Description
In MQTT protocol, three levels of QoS are defined.
QoS0 – At most once. Lowest level. After sending the message, the sender does not care whether the message has been received by the receiver.
QoS1 – At least once. Middle level. The message is guaranteed to be received by the receiver at least once.
QoS2 – Only once. Highest level. The message is guaranteed to be received by the receiver only once.

Initialize MQTT

MQTTClient

MQTTClient(client_id, server, port=0, user=None, password=None, keepalive=0, ssl=False, ssl_params={},reconn=True,version=4)

Creates MQTT clients.

  • Parameter
Parameter Type Description
client_id String Client ID. Each client ID is unique.
server String Server address, which can be an IP address or domain name.
port Integer Server port (optional). Default value: 1883. The default port of MQTT over SSL/TLS is 8883.
user String Username registered on the server (optional).
password String Password registered on the server (optional).
keepalive Integer Timeout of keep-alive (optional). Default value: 0. Unit: s.
ssl bool Enable or disable SSL/TSL encryption.
ssl_params String SSL/TLS parameter (optional).
reconn bool Enable or disable the internal reconnection mechanism (optional). Default value: True (enable).
version Integer The selected MQTT version (optional). version=3 indicates MQTTv3.1. Default value: 4. version=4 indicates MQTTv3.1.1.
  • Return Value

An MQTT client.

Set Callback Function

MQTTClient.set_callback

MQTTClient.set_callback(callback)

Sets the callback function of receiving messages.

  • Parameter
Parameter Type Description
callback function The callback function of receiving messages.
  • Return Value

None

MQTTClient.error_register_cb

MQTTClient.error_register_cb(callback)

Sets the callback function of error occurrence. When the MQTT internal thread is abnormal, the error message is returned by the callback function. The callback function can be called only when the internal reconnection is not enabled.

  • Parameter
Parameter Type Description
callback function The callback function of error occurrences.
  • Return Value

None

Example

from umqtt import MQTTClient

def err_cb(err):
    print("thread err:")
    print(err)

c = MQTTClient("umqtt_client", "mq.tongxinmao.com", 18830)
c.error_register_cb(err_cb)

MQTTClient.set_last_will

MQTTClient.set_last_will(topic,msg,retain=False,qos=0)

Sets the last will to be sent to the MQTT server. If a client ungracefully disconnects from the server without calling MQTTClient.disconnect(), the last will will be sent to other clients.

  • Parameter
Parameter Type Description
topic String Last-will topic.
msg String Last-will content
retain bool When retain = True, the MQTT broker will retain the message. Default value: False.
qos Integer Quality of Service, 0 or 1.
  • Return Value

None

MQTTClient.connect

MQTTClient.connect(clean_session=True)

Connects to MQTT server. Failed connection leads to an MQTT exception.

  • Parameter
Parameter Type Description
clean_session bool Client session type, optional parameter. If this value is True, the MQTT server will delete all information about the client when the client disconnects from the MQTT server. If this value is False, the client session is persistent, that is, when the client disconnects from the MQTT server, the subscription and queuing information will be retained. Default value: False.
  • Return Value

0 – Successful execution

Error message – Failed execution

MQTTClient.disconnect

MQTTClient.disconnect()

Disconnects from the MQTT server.

  • Parameter

None

  • Return Value

None

MQTTClient.close

MQTTClient.close()

Releases socket resources. (Please note the differences between MQTTClient.disconnect() and MQTTClient.close(), where MQTTClient.close() only releases socket resources but MQTTClient.disconnect() releases resources including threads.)

Note: This method can be used only when the client needs to reconnect to the MQTT server. See *Example of MQTT Reconnection After Ungraceful Disconnection* below for details. Call MQTTClient.disconnect() to normally disconnect from the MQTT server.

  • Parameter

None

  • Return Value

None

MQTTClient.ping

MQTTClient.ping()

Pings to MQTT server to check the connection when keepalive is not 0. When keepalive is 0, this method is disabled.

  • Parameter

None

  • Return Value

None

MQTTClient.publish

MQTTClient.publish(topic,msg, retain=False, qos=0)

Publishes messages.

  • Parameter
Parameter Type Description
topic String Message topic.
msg String Data to be sent.
retain bool Default value: False. If this value is set to True when you send a message, the message is retained.
The MQTT server retains the last received message with a RETAIN flag bit of True on the server. Whenever the MQTT client connects to the MQTT server and subscribes to a topic, if there is a Retained message under that topic, the MQTT server immediately pushes the Retained message to the client.
Note: The MQTT server will only save the last received message with the RETAIN flag bit of True for each topic, that is, if the MQTT server saves one retained message for a Topic, when the client publishes a new retained message, the original message on the server is overwritten.
qos Integer MQTT QoS, 0 or 1. Default value: 0.
0 – The sender sends a message only once.
1 – The sender sends a message at least once and guarantees that the message has been delivered to the MQTT broker.
  • Return Value

None

MQTTClient.subscribe

MQTTClient.subscribe(topic,qos)

Subscribes to MQTT topics.

  • Parameter
Parameter Type Description
topic String topic
qos Integer MQTT QoS, 0 or 1. Default value: 0.
0 – The sender sends a message only once.
1 – The sender sends a message at least once and guarantees that the message has been delivered to the MQTT broker.
  • Return Value

None

MQTTClient.check_msg

MQTTClient.check_msg()

Checks whether the MQTT server has messages to be processed.

  • Parameter

None

  • Return Value

None

MQTTClient.wait_msg

MQTTClient.wait_msg()

Blocks waiting for a message response from the MQTT server.

  • Parameter

None

  • Return Value

None

MQTTClient.get_mqttsta

MQTTClient.get_mqttsta()

Gets MQTT connection status.

Note:

  1. BG95 series module does not support the API.

  2. If you call MQTTClient.disconnect() before calling MQTTClient.get_mqttsta(), -1 will be returned, because the created object resources are released.

  • Parameter

None

  • Return Value

0 – Successful connection

1 – Connecting

2 – Server connection closed

-1 – Connection error

Example

'''
@Author: Baron
@Date: 2020-04-24
@LastEditTime: 2020-04-24 17:06:08
@Description: example for module umqtt
@FilePath: example_mqtt_file.py
'''
from umqtt import MQTTClient
import utime
import log
import checkNet


'''
The following two global variables are required. You can modify the values of the following two global variables according to your actual projects.
'''
PROJECT_NAME = "QuecPython_MQTT_example"
PROJECT_VERSION = "1.0.0"

checknet = checkNet.CheckNetwork(PROJECT_NAME, PROJECT_VERSION)

# Set the log output level.
log.basicConfig(level=log.INFO)
mqtt_log = log.getLogger("MQTT")


state = 0

def sub_cb(topic, msg):
    global state
    mqtt_log.info("Subscribe Recv: Topic={},Msg={}".format(topic.decode(), msg.decode()))
    state = 1


if __name__ == '__main__':
    stagecode, subcode = checknet.wait_network_connected(30)
    if stagecode == 3 and subcode == 1:
        mqtt_log.info('Network connection successful!')

        # Create an MQTT example.
        c = MQTTClient("umqtt_client", "mq.tongxinmao.com", 18830)
        # Set the callback function of receiving messages.
        c.set_callback(sub_cb)
        # Connect to the MQTT server.
        c.connect()
        # Subscribe to a topic.
        c.subscribe(b"/public/TEST/quecpython")
        mqtt_log.info("Connected to mq.tongxinmao.com, subscribed to /public/TEST/quecpython topic" )
        # Publish a message.
        c.publish(b"/public/TEST/quecpython", b"my name is Quecpython!")
        mqtt_log.info("Publish topic: /public/TEST/quecpython, msg: my name is Quecpython")

        while True:
            c.wait_msg()  # Blocking function of monitoring messages.
            if state == 1:
                break
        # Disconnects from the MQTT server.
        c.disconnect()
    else:
        mqtt_log.info('Network connection failed! stagecode = {}, subcode = {}'.format(stagecode, subcode))

Example of MQTT Reconnection After Ungraceful Disconnection

Note:

  1. The parameter reconn in the following example enables or disables the internal reconnection mechanism. Default value: True (enable).

  2. If you need to test or use the external reconnection mechanism, please refer to this example code below. Before testing, set reconn to False, otherwise, the internal reconnection mechanism will be used by default.

'''
@Author: Baron
@Date: 2020-04-24
@LastEditTime: 2021-05-25 17:06:08
@Description: example for module umqtt
@FilePath: example_mqtt_file.py
'''
'''
The following two global variables are required. You can modify the values of the following two global variables according to your actual projects.
The values of these two variables are printed before the user code is executed.
'''
import utime
import log
import net
import _thread
import checkNet
import dataCall
from umqtt import MQTTClient

PROJECT_NAME = "QuecPython_MQTT_example"
PROJECT_VERSION = "1.0.0"

checknet = checkNet.CheckNetwork(PROJECT_NAME, PROJECT_VERSION)

# Reclaim the thread resource through the status after calling MQTTClient.disconnect().
TaskEnable = True
# Set the log output level.
log.basicConfig(level=log.INFO)
mqtt_log = log.getLogger("MQTT")


# Encapsulate MQTT so it can support more custom logic.
class MqttClient():
    '''
    mqtt init
    '''

    # Note: The parameter reconn enables or disables the internal reconnection mechanism. Default value: True (enable).
    # If you need to test or use the external reconnection mechanism, please refer to this example code below. Before testing, set reconn to False, otherwise, the internal reconnection mechanism will be used by default.
    def __init__(self, clientid, server, port, user=None, password=None, keepalive=0, ssl=False, ssl_params={},
                 reconn=True):
        self.__clientid = clientid
        self.__pw = password
        self.__server = server
        self.__port = port
        self.__uasename = user
        self.__keepalive = keepalive
        self.__ssl = ssl
        self.__ssl_params = ssl_params
        self.topic = None
        self.qos = None
        # Network status flag.
        self.__nw_flag = True
        # Create a mutex.
        self.mp_lock = _thread.allocate_lock()
        # Create a class to initialize the MQTT object.
        self.client = MQTTClient(self.__clientid, self.__server, self.__port, self.__uasename, self.__pw,
                                 keepalive=self.__keepalive, ssl=self.__ssl, ssl_params=self.__ssl_params,
                                 reconn=reconn)

    def connect(self):
        '''
        Connect to the MQTT server.
        '''
        self.client.connect()
        # Register the callback function of network status. When the network status changes, the function will be called.
        flag = dataCall.setCallback(self.nw_cb)
        if flag != 0:
            # The network callback registration failed.
            raise Exception("Network callback registration failed")

    def set_callback(self, sub_cb):
        '''
        Set the callback function of receiving messages.
        '''
        self.client.set_callback(sub_cb)

    def error_register_cb(self, func):
        '''
        Set the callback function of receiving MQTT thread error occurrence.
        '''
        self.client.error_register_cb(func)

    def subscribe(self, topic, qos=0):
        '''
        Subscribe to topics.
        '''
        self.topic = topic  # Save the topic. Multiple topics can be saved by a list.
        self.qos = qos  # Save the QoS.
        self.client.subscribe(topic, qos)

    def publish(self, topic, msg, qos=0):
        '''
        Publish a message.
        '''
        self.client.publish(topic, msg, qos)

    def disconnect(self):
        '''
        Disconnect from the MQTT server.
        '''
        global TaskEnable
        # Close the monitoring thread of wait_msg.
        TaskEnable = False
        # Disconnect from the MQTT server and release the resources.
        self.client.disconnect()

    def reconnect(self):
        '''
        MQTT reconnection mechanism (The following example is for your reference only and you can adjust based on actual needs.)
        Note: 1. If other services need to be restarted after the client reconnects to the server, determine whether to release the resources of the previous services before restarting the services.
              2. This section needs to be added based on the actual business logic, and this example only covers the process that the client resubscribes to topics after reconnecting to the MQTT server.
        '''
        # Determine whether the lock has been acquired.
        if self.mp_lock.locked():
            return
        self.mp_lock.acquire()
        # Close the previous connection before reconnecting to release resources. Please note the differences between *MQTTClient.disconnect()* and *MQTTClient.close()*, where MQTTClient.close() only releases socket resources but *MQTTClient.disconnect()* releases resources including threads.
        self.client.close()
        # Reconnect to the MQTT server.
        while True:
            net_sta = net.getState()  # Get network registration information.
            if net_sta != -1 and net_sta[1][0] == 1:
                call_state = dataCall.getInfo(1, 0)  # Get data call information.
                if (call_state != -1) and (call_state[2][0] == 1):
                    try:
                        # The network is normal. Reconnect to the MQTT server.
                        self.connect()
                    except Exception as e:
                        # Reconnection to the MQTT server failed. Try again 5 s later.
                        self.client.close()
                        utime.sleep(5)
                        continue
                else:
                    # The network is unrestored. Please wait.
                    utime.sleep(10)
                    continue
                # Connect to the MQTT server successfully and subscribe to the topic.
                try:
                    # Multiple topics can be saved by a list. Traverse the list to resubscribe the topic.
                    if self.topic is not None:
                        self.client.subscribe(self.topic, self.qos)
                    self.mp_lock.release()
                except:
                    # Subscription failed. Reconnect to the MQTT server.
                    self.client.close()
                    utime.sleep(5)
                    continue
            else:
                utime.sleep(5)
                continue
            break  # Stop loop.
        # Exit and reconnect.
        return True

    def nw_cb(self, args):
        '''
        Call the callback function of data call.
        '''
        nw_sta = args[1]
        if nw_sta == 1:
            # Network connected.
            mqtt_log.info("*** network connected! ***")
            self.__nw_flag = True
        else:
            # Network disconnected.
            mqtt_log.info("*** network not connected! ***")
            self.__nw_flag = False

    def __listen(self):
        while True:
            try:
                if not TaskEnable:
                    break
                self.client.wait_msg()
            except OSError as e:
                # Determine whether the network is disconnected.
                if not self.__nw_flag:
                    # Reconnect after the network is restored from disconnection.
                    self.reconnect()
                # Reconnect when the socket status is abnormal.
                elif self.client.get_mqttsta() != 0 and TaskEnable:
                    self.reconnect()
                else:
                    # You can call the raise method to return an exception or -1.
                    return -1

    def loop_forever(self):
        _thread.start_new_thread(self.__listen, ())

if __name__ == '__main__':
    '''
    When running this routine manually, you can remove this delay. If you change the file name of the routine to main.py, you need to add this delay when you want to start the routine automatically. Otherwise, you cannot see the information printed in poweron_print_once() below from the CDC interface.
    '''
    utime.sleep(5)
    checknet.poweron_print_once()
    '''
    If the user program contains network-related codes, it must execute wait_network_connected() to wait for the network to be ready (successful data call); If it is a network-independent code, you can mask wait_network_connected().
    【This routine must retain the following line.】
    '''
    checknet.wait_network_connected()

    def sub_cb(topic, msg):
        # global state
        mqtt_log.info("Subscribe Recv: Topic={},Msg={}".format(topic.decode(), msg.decode()))

    c = MqttClient("umqtt_client_753", "mq.tongxinmao.com", 18830, reconn=False)

    def err_cb(error):
        '''
        Set the callback function of receiving MQTT thread error occurrence.

        '''
    	mqtt_log.info(error)
    	c.reconnect() # Reconnect to MQTT server after error occurrences.

    # c = MqttClient("umqtt_client_753", "mq.tongxinmao.com", 18830, reconn=False)
    # Set the callback function of receiving messages.
    c.set_callback(sub_cb)
    # Set the callback function of error occurrence.
    c.error_register_cb(err_cb)
    # Connect to the MQTT server.
    c.connect()
    # Subscribe to topics.
    c.subscribe(b"/public/TEST/quecpython758")
    mqtt_log.info("Connected to mq.tongxinmao.com, subscribed to /public/TEST/quecpython topic")
    # Publish a message.
    c.publish(b"/public/TEST/quecpython758", b"my name is Quecpython!")
    mqtt_log.info("Publish topic: /public/TEST/quecpython758, msg: my name is Quecpython")
    # Monitor MQTT messages. 
    c.loop_forever()
    # Wait for 5 s to receive the message.
    # Note: Comment c.disconnect () and utime.sleep(5) if you want to test the reconnection mechanism, including server disconnection.
    # utime.sleep(5)
    # Disconnect to the MQTT server.
    # c.disconnect()