umqtt - MQTT客户端
模块功能:提供创建MQTT客户端发布订阅功能。
QoS级别说明
在MQTT协议中,定义了三个级别的QoS,分别是:
QoS0 – 最多一次,是最低级别;发送者发送完消息之后,并不关心消息是否已经到达接收方;
QoS1 – 至少一次,是中间级别;发送者保证消息至少送达到接收方一次;
QoS2 – 有且仅有一次,是最高级别;保证消息送达且仅送达一次。
构造函数
umqtt.MQTTClient
class umqtt.MQTTClient(client_id, server, port=0, user=None, password=None, keepalive=0, ssl=False, ssl_params={},reconn=True,version=4)
构建mqtt连接对象。
参数描述:
client_id
- 客户端 ID,字符串类型,具有唯一性。server
- 服务端地址,字符串类型,可以是 IP 或者域名。port
- 服务器端口(可选),整数类型,默认为1883,请注意,MQTT over SSL/TLS的默认端口是8883。user
- (可选) 在服务器上注册的用户名,字符串类型。password
- (可选) 在服务器上注册的密码,字符串类型。keepalive
- (可选)客户端的keepalive超时值,整数类型,默认为0。ssl
- (可选)是否使能 SSL/TLS 支持,布尔值类型。ssl_params
- (可选)SSL/TLS 参数,字符串类型。reconn
- (可选)控制是否使用内部重连的标志,布尔值类型,默认开启为True。version
- (可选)选择使用mqtt版本,整数类型,version=3开启MQTTv3.1,默认version=4开启MQTTv3.1.1。
设置相关功能和回调
MQTTClient.set_callback
MQTTClient.set_callback(callback)
设置回调函数,收到消息时会被调用。
回调函数不建议处理阻塞API,会影响mqtt消息接收。
参数描述:
callback
- 设置异常回调函数,function(topic, data)类型
示例:
from umqtt import MQTTClient
def data_cb(topic, data):
try:
print("umqtt recv topic[%s] data:%s:"%(topic, data))
except Exception as e:
print("Handle error:%s"%str(e))
c = MQTTClient("umqtt_client", "mq.tongxinmao.com", 18830)
c.set_callback(data_cb)
MQTTClient.error_register_cb
MQTTClient.error_register_cb(callback)
设置异常回调函数,使用内部重连的情况下umqtt内部线程异常时通过回调返回error信息。
参数描述:
callback
- 设置异常回调函数,function(error)类型
示例:
from umqtt import MQTTClient
def err_cb(err):
print("thread err:%s"%err)
if err == "reconnect_start":
print("start reconnect")
elif err == "reconnect_success":
print("success reconnect")
else:
print("reconnect FAIL")
c = MQTTClient("umqtt_client", "mq.tongxinmao.com", 18830)
c.error_register_cb(err_cb)
MQTTClient.set_last_will
MQTTClient.set_last_will(topic,msg,retain=False,qos=0)
设置要发送给服务器的遗嘱,客户端没有调用disconnect()异常断开,则发送通知到其他客户端。
参数描述:
topic
- mqtt遗嘱主题,字符串类型。msg
- 遗嘱的内容,字符串类型。retain
- retain = True boker会一直保留消息,默认False,布尔值类型。qos
- 整数类型,消息服务质量(0~1)。
MQTT连接相关功能
MQTTClient.connect
MQTTClient.connect(clean_session=True)
与服务器建立连接,连接失败会导致MQTTException异常。
mqtt相关操作要在确认connect成功后执行。
参数描述:
clean_session
- 布尔值类型,可选参数,一个决定客户端类型的布尔值。 如果为True,那么代理将在其断开连接时删除有关此客户端的所有信息。 如果为False,则客户端是持久客户端,当客户端断开连接时,订阅信息和排队消息将被保留。默认为True。
返回值描述:
成功返回0,失败则抛出异常。
MQTTClient.disconnect
MQTTClient.disconnect()
与服务器断开连接,释放相关资源。
MQTTClient.close
MQTTClient.close()
释放socket资源,(注意区别disconnect方法,close只释放socket资源,disconnect包含线程等资源)。
该方法仅用于在自己实现重连时使用,具体请参照mqtt重连示例代码,正常关闭mqtt连接请使用disconnect。
MQTTClient.ping
MQTTClient.ping()
当keepalive不为0且在时限内没有通讯活动,会主动向服务器发送ping包, 检测保持连通性,keepalive为0则不开启。
发布订阅相关功能
MQTTClient.publish
MQTTClient.publish(topic, msg, retain=False, qos=0)
发布消息到对应topic。
参数描述:
topic
- mqtt 消息主题,字符串类型msg
- 需要发送的数据,字符串类型retain
- 布尔值类型,默认为False, 发布消息时把retain设置为true,即为保留信息。
MQTT服务器会将最近收到的一条RETAIN标志位为True的消息保存在服务器端, 每当MQTT客户端连接到MQTT服务器并订阅了某个topic,如果该topic下有Retained消息,那么MQTT服务器会立即向客户端推送该条Retained消息
特别注意:MQTT服务器只会为每一个Topic保存最近收到的一条RETAIN标志位为True的消息!也就是说,如果MQTT服务器上已经为某个Topic保存了一条Retained消息,当客户端再次发布一条新的Retained消息,那么服务器上原来的那条消息会被覆盖! |qos
- 整数类型, MQTT消息服务质量(默认0,可选择0或1)0:发送者只发送一次消息,不进行重试 1:发送者最少发送一次消息,确保消息到达Broker
返回值描述:
成功返回布尔型True,失败返回布尔型False。
MQTTClient.subscribe
MQTTClient.subscribe(topic,qos)
订阅mqtt主题。
参数描述:
topic
- mqtt topic主题,字符串类型。qos
- MQTT消息服务质量(默认0,可选择0或1),整数类型
0:发送者只发送一次消息,不进行重试 1:发送者最少发送一次消息,确保消息到达Broker。
MQTTClient.wait_msg
MQTTClient.wait_msg()
阻塞等待服务器消息响应。
MQTTClient.get_mqttsta
MQTTClient.get_mqttsta()
获取mqtt连接状态。
注意:BG95平台不支持该API。
如果用户调用了 disconnect() 方法之后,再调用 MQTTClient.get_mqttsta() 会返回-1,因为此时创建的对象资源等都已经被释放。
返回值描述:
0:连接成功。
1:连接中。
2:服务端连接关闭。
-1:连接异常。
示例:
'''
@Author: Baron
@Date: 2020-04-24
@LastEditTime: 2020-04-24 17:06:08
@Description: example for module umqtt
@FilePath: example_mqtt_file.py
'''
from umqtt import MQTTClient
import utime
import log
import checkNet
'''
下面两个全局变量是必须有的,用户可以根据自己的实际项目修改下面两个全局变量的值
'''
PROJECT_NAME = "QuecPython_MQTT_example"
PROJECT_VERSION = "1.0.0"
checknet = checkNet.CheckNetwork(PROJECT_NAME, PROJECT_VERSION)
# 设置日志输出级别
log.basicConfig(level=log.INFO)
mqtt_log = log.getLogger("MQTT")
state = 0
def sub_cb(topic, msg):
global state
mqtt_log.info("Subscribe Recv: Topic={},Msg={}".format(topic.decode(), msg.decode()))
state = 1
if __name__ == '__main__':
stagecode, subcode = checknet.wait_network_connected(30)
if stagecode == 3 and subcode == 1:
mqtt_log.info('Network connection successful!')
# 创建一个mqtt实例
c = MQTTClient("umqtt_client", "mq.tongxinmao.com", 18830)
# 设置消息回调
c.set_callback(sub_cb)
#建立连接
c.connect()
# 订阅主题
c.subscribe(b"/public/TEST/quecpython")
mqtt_log.info("Connected to mq.tongxinmao.com, subscribed to /public/TEST/quecpython topic" )
# 发布消息
c.publish(b"/public/TEST/quecpython", b"my name is Quecpython!")
mqtt_log.info("Publish topic: /public/TEST/quecpython, msg: my name is Quecpython")
while True:
c.wait_msg() # 阻塞函数,监听消息
if state == 1:
break
# 关闭连接
c.disconnect()
else:
mqtt_log.info('Network connection failed! stagecode = {}, subcode = {}'.format(stagecode, subcode))
MQTT断网异常重连示例
特别说明:
1.下面示例代码中mqtt的reconn参数用于控制使用或关闭umqtt内部的重连机制,默认为True,使用内部重连机制。
2.如需测试或使用外部重连机制可参考此示例代码,测试前需将reconn=False,否则默认会使用内部重连机制!
'''
@Author: Baron
@Date: 2020-04-24
@LastEditTime: 2021-05-25 17:06:08
@Description: example for module umqtt
@FilePath: example_mqtt_file.py
'''
'''
下面两个全局变量是必须有的,用户可以根据自己的实际项目修改下面两个全局变量的值,
在执行用户代码前,会先打印这两个变量的值。
'''
import utime
import log
import net
import _thread
import checkNet
import dataCall
from umqtt import MQTTClient
PROJECT_NAME = "QuecPython_MQTT_example"
PROJECT_VERSION = "1.0.0"
checknet = checkNet.CheckNetwork(PROJECT_NAME, PROJECT_VERSION)
# 调用disconnect后会通过该状态回收线程资源
TaskEnable = True
# 设置日志输出级别
log.basicConfig(level=log.INFO)
mqtt_log = log.getLogger("MQTT")
# 封装mqtt,使其可以支持更多自定义逻辑
class MqttClient():
'''
mqtt init
'''
# 说明:reconn该参数用于控制使用或关闭umqtt内部的重连机制,默认为True,使用内部重连机制。
# 如需测试或使用外部重连机制可参考此示例代码,测试前需将reconn=False,否则默认会使用内部重连机制!
def __init__(self, clientid, server, port, user=None, password=None, keepalive=0, ssl=False, ssl_params={},
reconn=True):
self.__clientid = clientid
self.__pw = password
self.__server = server
self.__port = port
self.__uasename = user
self.__keepalive = keepalive
self.__ssl = ssl
self.__ssl_params = ssl_params
self.topic = None
self.qos = None
# 网络状态标志
self.__nw_flag = True
# 创建互斥锁
self.mp_lock = _thread.allocate_lock()
# 创建类的时候初始化出mqtt对象
self.client = MQTTClient(self.__clientid, self.__server, self.__port, self.__uasename, self.__pw,
keepalive=self.__keepalive, ssl=self.__ssl, ssl_params=self.__ssl_params,
reconn=reconn)
def connect(self):
'''
连接mqtt Server
'''
self.client.connect()
# 注册网络回调函数,网络状态发生变化时触发
flag = dataCall.setCallback(self.nw_cb)
if flag != 0:
# 回调注册失败
raise Exception("Network callback registration failed")
def set_callback(self, sub_cb):
'''
设置mqtt回调消息函数
'''
self.client.set_callback(sub_cb)
def error_register_cb(self, func):
'''
注册一个接收umqtt内线程异常的回调函数
'''
self.client.error_register_cb(func)
def subscribe(self, topic, qos=0):
'''
订阅Topic
'''
self.topic = topic # 保存topic ,多个topic可使用list保存
self.qos = qos # 保存qos
self.client.subscribe(topic, qos)
def publish(self, topic, msg, qos=0):
'''
发布消息
'''
self.client.publish(topic, msg, qos)
def disconnect(self):
'''
关闭连接
'''
global TaskEnable
# 关闭wait_msg的监听线程
TaskEnable = False
# 关闭之前的连接,释放资源
self.client.disconnect()
def reconnect(self):
'''
mqtt 重连机制(该示例仅提供mqtt重连参考,根据实际情况调整)
PS:1.如有其他业务需要在mqtt重连后重新开启,请先考虑是否需要释放之前业务上的资源再进行业务重启
2.该部分需要自己根据实际业务逻辑添加,此示例只包含mqtt重连后重新订阅Topic
'''
# 判断锁是否已经被获取
if self.mp_lock.locked():
return
self.mp_lock.acquire()
# 重新连接前关闭之前的连接,释放资源(注意区别disconnect方法,close只释放socket资源,disconnect包含mqtt线程等资源)
self.client.close()
# 重新建立mqtt连接
while True:
net_sta = net.getState() # 获取网络注册信息
if net_sta != -1 and net_sta[1][0] == 1:
call_state = dataCall.getInfo(1, 0) # 获取拨号信息
if (call_state != -1) and (call_state[2][0] == 1):
try:
# 网络正常,重新连接mqtt
self.connect()
except Exception as e:
# 重连mqtt失败, 5s继续尝试下一次
self.client.close()
utime.sleep(5)
continue
else:
# 网络未恢复,等待恢复
utime.sleep(10)
continue
# 重新连接mqtt成功,订阅Topic
try:
# 多个topic采用list保存,遍历list重新订阅
if self.topic is not None:
self.client.subscribe(self.topic, self.qos)
self.mp_lock.release()
except:
# 订阅失败,重新执行重连逻辑
self.client.close()
utime.sleep(5)
continue
else:
utime.sleep(5)
continue
break # 结束循环
# 退出重连
return True
def nw_cb(self, args):
'''
dataCall 网络回调
'''
nw_sta = args[1]
if nw_sta == 1:
# 网络连接
mqtt_log.info("*** network connected! ***")
self.__nw_flag = True
else:
# 网络断线
mqtt_log.info("*** network not connected! ***")
self.__nw_flag = False
def __listen(self):
while True:
try:
if not TaskEnable:
break
self.client.wait_msg()
except OSError as e:
# 判断网络是否断线
if not self.__nw_flag:
# 网络断线等待恢复进行重连
self.reconnect()
# 在socket状态异常情况下进行重连
elif self.client.get_mqttsta() != 0 and TaskEnable:
self.reconnect()
else:
# 这里可选择使用raise主动抛出异常或者返回-1
return -1
def loop_forever(self):
_thread.start_new_thread(self.__listen, ())
if __name__ == '__main__':
'''
手动运行本例程时,可以去掉该延时,如果将例程文件名改为main.py,希望开机自动运行时,需要加上该延时,
否则无法从CDC口看到下面的 poweron_print_once() 中打印的信息
'''
utime.sleep(5)
checknet.poweron_print_once()
'''
如果用户程序包含网络相关代码,必须执行 wait_network_connected() 等待网络就绪(拨号成功);
如果是网络无关代码,可以屏蔽 wait_network_connected()
【本例程必须保留下面这一行!】
'''
checknet.wait_network_connected()
def sub_cb(topic, msg):
# global state
mqtt_log.info("Subscribe Recv: Topic={},Msg={}".format(topic.decode(), msg.decode()))
c = MqttClient("umqtt_client_753", "mq.tongxinmao.com", 18830, reconn=False)
def err_cb(error):
'''
接收umqtt线程内异常的回调函数
'''
mqtt_log.info(error)
c.reconnect() # 可根据异常进行重连
# c = MqttClient("umqtt_client_753", "mq.tongxinmao.com", 18830, reconn=False)
# 设置消息回调
c.set_callback(sub_cb)
# 设置异常回调
c.error_register_cb(err_cb)
# 建立连接
c.connect()
# 订阅主题
c.subscribe(b"/public/TEST/quecpython758")
mqtt_log.info("Connected to mq.tongxinmao.com, subscribed to /public/TEST/quecpython topic")
# 发布消息
c.publish(b"/public/TEST/quecpython758", b"my name is Quecpython!")
mqtt_log.info("Publish topic: /public/TEST/quecpython758, msg: my name is Quecpython")
# 监听mqtt消息
c.loop_forever()
# 等待5s接收消息
# PS:如果需要测试重连,包括服务器断开连接等情况,请注释掉c.disconnect()和utime.sleep(5)
# utime.sleep(5)
# 关闭连接
# c.disconnect()