micropython-mqtt
MQTT库目前使用的是micropython-mqtt开源库。对比参考 micropython-mqtt。
备注
网关内默认没有集成此库,需要自己选择合并到脚本中使用。
为了支持在网关内运行使用,示例移除了network sta相关处理。
示例
说明
为了方便快速测试,示例使用 test.mosquitto.org
提供的测试服务。
参数 |
说明 |
示例 |
---|---|---|
client_id |
Client ID |
CC:1B:E0:00:00:01 |
server |
MQTT Broker地址 |
test.mosquitto.org |
port |
MQTT Broker端口 |
1884 |
user |
用户名 |
rw |
password |
密码 |
readwrite |
upstream topic |
发布Topic |
/cassia/test/up |
downstream topic |
订阅Topic |
/cassia/test/down |
测试工具使用 mosquitto_sub/mosquitto_pub
,您也可以使用 MQTTX
或者测试脚本进行。
# Subscribe test topic
mosquitto_sub -h test.mosquitto.org -p 1884 -u rw -P readwrite -t '/cassia/test/up'
# Publish test message
mosquitto_pub -h test.mosquitto.org -p 1884 -u rw -P readwrite -t '/cassia/test/down' -m '{"hello":"world"}'
代码
1# ===========================
2# micropython-mqtt mqtt_as.py
3# ===========================
4
5# mqtt_as.py Asynchronous version of umqtt.robust
6# (C) Copyright Peter Hinch 2017-2025.
7# Released under the MIT licence.
8
9# Pyboard D support added also RP2/default
10# Various improvements contributed by Kevin Köck
11# V5 support added by Bob Veringa.
12# Also other contributors.
13
14import gc
15import socket
16import struct
17
18gc.collect()
19import asyncio
20
21gc.collect()
22from time import ticks_ms, ticks_diff
23from errno import EINPROGRESS, ETIMEDOUT
24
25gc.collect()
26from sys import platform
27
28VERSION = (0, 8, 4)
29# Default initial size for input messge buffer. Increase this if large messages
30# are expected, but rarely, to avoid big runtime allocations
31IBUFSIZE = 50
32# By default the callback interface returns and incoming message as bytes.
33# For performance reasons with large messages it may return a memoryview.
34MSG_BYTES = True
35
36# Legitimate errors while waiting on a socket. See uasyncio __init__.py open_connection().
37ESP32 = platform == "esp32"
38RP2 = platform == "rp2"
39if ESP32:
40 # https://forum.micropython.org/viewtopic.php?f=16&t=3608&p=20942#p20942
41 BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, 118, 119] # Add in weird ESP32 errors
42elif RP2:
43 BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, -110]
44else:
45 from errno import ENOTCONN
46
47 BUSY_ERRORS = [ENOTCONN, EINPROGRESS, ETIMEDOUT]
48
49
50# Default "do little" coro for optional user replacement
51async def eliza(*_): # e.g. via set_wifi_handler(coro): see test program
52 await asyncio.sleep_ms(0)
53
54
55class MsgQueue:
56 def __init__(self, size):
57 self._q = [0 for _ in range(max(size, 4))]
58 self._size = size
59 self._wi = 0
60 self._ri = 0
61 self._evt = asyncio.Event()
62 self.discards = 0
63
64 def put(self, *v):
65 self._q[self._wi] = v
66 self._evt.set()
67 self._wi = (self._wi + 1) % self._size
68 if self._wi == self._ri: # Would indicate empty
69 self._ri = (self._ri + 1) % self._size # Discard a message
70 self.discards += 1
71
72 def __aiter__(self):
73 return self
74
75 async def __anext__(self):
76 if self._ri == self._wi: # Empty
77 self._evt.clear()
78 await self._evt.wait()
79 r = self._q[self._ri]
80 self._ri = (self._ri + 1) % self._size
81 return r
82
83
84class MQTTException(Exception):
85 pass
86
87
88def pid_gen():
89 pid = 0
90 while True:
91 pid = pid + 1 if pid < 65535 else 1
92 yield pid
93
94
95def qos_check(qos):
96 if not (qos == 0 or qos == 1):
97 raise ValueError("Only qos 0 and 1 are supported.")
98
99
100# Populate a byte array with a variable byte integer. Args: buf the bytearray,
101# offs: start offset. x the value. Returns the end offset.
102# 1-4 bytes allowed, encoding up to 268,435,455 (V3.1.1 table 2.4). No point trapping this.
103def vbi(buf: bytearray, offs: int, x: int):
104 buf[offs] = x & 0x7F
105 if x := x >> 7:
106 buf[offs] |= 0x80
107 return vbi(buf, offs + 1, x) if x else (offs + 1)
108
109
110encode_properties = None
111decode_properties = None
112
113
114class MQTT_base:
115 REPUB_COUNT = 0 # TEST
116 DEBUG = False
117
118 def __init__(self, config):
119 self._events = config["queue_len"] > 0
120 # MQTT config
121 self._client_id = config["client_id"]
122 self._user = config["user"]
123 self._pswd = config["password"]
124 self._keepalive = config["keepalive"]
125 if self._keepalive >= 65536:
126 raise ValueError("invalid keepalive time")
127 self._response_time = (
128 config["response_time"] * 1000
129 ) # Repub if no PUBACK received (ms).
130 self._max_repubs = config["max_repubs"]
131 self._clean_init = config[
132 "clean_init"
133 ] # clean_session state on first connection
134 self._clean = config["clean"] # clean_session state on reconnect
135 will = config["will"]
136 if will is None:
137 self._lw_topic = False
138 else:
139 self._set_last_will(*will)
140 self._ssl = config["ssl"]
141 self._ssl_params = config["ssl_params"]
142 # Callbacks and coros
143 if self._events:
144 self.up = asyncio.Event()
145 self.down = asyncio.Event()
146 self.queue = MsgQueue(config["queue_len"])
147 self._cb = self.queue.put
148 else: # Callbacks
149 self._cb = config["subs_cb"]
150 self._wifi_handler = config["wifi_coro"]
151 self._connect_handler = config["connect_coro"]
152 # Network
153 self.port = config["port"]
154 if self.port == 0:
155 self.port = 8883 if self._ssl else 1883
156 self.server = config["server"]
157 if self.server is None:
158 raise ValueError("no server specified.")
159 self._sock = None
160
161 self.newpid = pid_gen()
162 self.rcv_pids = set() # PUBACK and SUBACK pids awaiting ACK response
163 self.last_rx = ticks_ms() # Time of last communication from broker
164 self.lock = asyncio.Lock()
165 self._ibuf = bytearray(IBUFSIZE)
166 self._mvbuf = memoryview(self._ibuf)
167
168 self.mqttv5 = config.get("mqttv5")
169 self.mqttv5_con_props = config.get("mqttv5_con_props")
170 self.topic_alias_maximum = 0
171
172 if self.mqttv5:
173 global encode_properties, decode_properties
174 from .mqtt_v5_properties import encode_properties, decode_properties # noqa
175
176 def _set_last_will(self, topic, msg, retain=False, qos=0):
177 qos_check(qos)
178 if not topic:
179 raise ValueError("Empty topic.")
180 self._lw_topic = topic
181 self._lw_msg = msg
182 self._lw_qos = qos
183 self._lw_retain = retain
184
185 def dprint(self, msg, *args):
186 if self.DEBUG:
187 print(msg % args)
188
189 def _timeout(self, t):
190 return ticks_diff(ticks_ms(), t) > self._response_time
191
192 async def _as_read(self, n, sock=None): # OSError caught by superclass
193 if sock is None:
194 sock = self._sock
195 # Ensure input buffer is big enough to hold data. It keeps the new size
196 oflow = n - len(self._ibuf)
197 if oflow > 0: # Grow the buffer and re-create the memoryview
198 # Avoid too frequent small allocations by adding some extra bytes
199 self._ibuf.extend(bytearray(oflow + 50))
200 self._mvbuf = memoryview(self._ibuf)
201 buffer = self._mvbuf
202 size = 0
203 t = ticks_ms()
204 while size < n:
205 if self._timeout(t) or not self.isconnected():
206 raise OSError(-1, "Timeout on socket read")
207 try:
208 msg_size = sock.readinto(buffer[size:], n - size)
209 except OSError as e: # ESP32 issues weird 119 errors here
210 msg_size = None
211 if e.args[0] not in BUSY_ERRORS:
212 raise
213 if msg_size == 0: # Connection closed by host
214 raise OSError(-1, "Connection closed by host")
215 if msg_size is not None: # data received
216 size += msg_size
217 t = ticks_ms()
218 self.last_rx = ticks_ms()
219 await asyncio.sleep_ms(0)
220 return buffer[:n]
221
222 async def _as_write(self, bytes_wr, length=0, sock=None):
223 if sock is None:
224 sock = self._sock
225
226 # Wrap bytes in memoryview to avoid copying during slicing
227 bytes_wr = memoryview(bytes_wr)
228 if length:
229 bytes_wr = bytes_wr[:length]
230 t = ticks_ms()
231 while bytes_wr:
232 if self._timeout(t) or not self.isconnected():
233 raise OSError(-1, "Timeout on socket write")
234 try:
235 n = sock.write(bytes_wr)
236 except OSError as e: # ESP32 issues weird 119 errors here
237 n = 0
238 if e.args[0] not in BUSY_ERRORS:
239 raise
240 if n:
241 t = ticks_ms()
242 bytes_wr = bytes_wr[n:]
243 await asyncio.sleep_ms(0)
244
245 async def _send_str(self, s):
246 await self._as_write(struct.pack("!H", len(s)))
247 await self._as_write(s)
248
249 # Receive a Variable Byte Integer and decode.
250 async def _recv_len(self, d=0, i=0):
251 s = (await self._as_read(1))[0]
252 d |= (s & 0x7F) << (i * 7)
253 return await self._recv_len(d, i + 1) if (s & 0x80) else (d, i + 1)
254
255 async def _connect(self, clean):
256 mqttv5 = self.mqttv5 # Cache local
257 self._sock = socket.socket()
258 self._sock.setblocking(False)
259 try:
260 self._sock.connect(self._addr)
261 except OSError as e:
262 if e.args[0] not in BUSY_ERRORS:
263 raise
264 await asyncio.sleep_ms(0)
265 self.dprint("Connecting to broker.")
266 if self._ssl:
267 try:
268 import ssl
269 except ImportError:
270 import ussl as ssl
271
272 self._sock = ssl.wrap_socket(self._sock, **self._ssl_params)
273 premsg = bytearray(b"\x10\0\0\0\0\0")
274 msg = bytearray(b"\x04MQTT\x00\0\0\0")
275 msg[5] = 0x05 if mqttv5 else 0x04
276
277 sz = 10 + 2 + len(self._client_id)
278 msg[6] = clean << 1
279 if self._user:
280 sz += 2 + len(self._user) + 2 + len(self._pswd)
281 msg[6] |= 0xC0
282 if self._keepalive:
283 msg[7] |= self._keepalive >> 8
284 msg[8] |= self._keepalive & 0x00FF
285 if self._lw_topic:
286 sz += 2 + len(self._lw_topic) + 2 + len(self._lw_msg)
287 if mqttv5:
288 # Extra for the will properties
289 sz += 1
290 msg[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3
291 msg[6] |= self._lw_retain << 5
292
293 if mqttv5:
294 properties = encode_properties(self.mqttv5_con_props)
295 sz += len(properties)
296
297 i = vbi(premsg, 1, sz) # sz -> Variable Byte Integer
298 await self._as_write(premsg, i + 1)
299 await self._as_write(msg)
300 if mqttv5:
301 await self._as_write(properties)
302
303 await self._send_str(self._client_id)
304 if self._lw_topic:
305 if mqttv5:
306 # We don't support will properties, so we send 0x00 for properties length
307 await self._as_write(b"\x00")
308 await self._send_str(self._lw_topic)
309 await self._send_str(self._lw_msg)
310 if self._user:
311 await self._send_str(self._user)
312 await self._send_str(self._pswd)
313 # Await CONNACK
314 # read causes ECONNABORTED if broker is out; triggers a reconnect.
315 del premsg, msg
316 packet_type = await self._as_read(1)
317 if packet_type[0] != 0x20:
318 raise OSError(-1, "CONNACK not received")
319 # The connect packet has changed, so size might be different now. But
320 # we can still handle it the same for 3.1.1 and v5
321 sz, _ = await self._recv_len()
322 if not mqttv5 and sz != 2:
323 raise OSError(-1, "Invalid CONNACK packet")
324
325 # Only read the first 2 bytes, as properties have their own length
326 connack_resp = await self._as_read(2)
327
328 # Connect ack flags
329 if connack_resp[0] != 0:
330 raise OSError(-1, "CONNACK flags not 0")
331 # Reason code
332 if connack_resp[1] != 0:
333 # On MQTTv5 Reason codes below 128 may need to be handled
334 # differently. For now, we just raise an error. Spec is a bit weird
335 # on this.
336 raise OSError(-1, "CONNACK reason code 0x%x" % connack_resp[1])
337
338 del connack_resp
339 if not mqttv5:
340 # If we are not on MQTTv5 we can stop here
341 return
342
343 connack_props_length, _ = await self._recv_len()
344 if connack_props_length > 0:
345 connack_props = await self._as_read(connack_props_length)
346 decoded_props = decode_properties(connack_props, connack_props_length)
347 self.dprint("CONNACK properties: %s", decoded_props)
348 self.topic_alias_maximum = decoded_props.get(0x22, 0)
349
350 async def _ping(self):
351 async with self.lock:
352 await self._as_write(b"\xc0\0")
353
354 # Check internet connectivity by sending DNS lookup to Google's 8.8.8.8
355 async def wan_ok(
356 self,
357 packet=b"$\x1a\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03www\x06google\x03com\x00\x00\x01\x00\x01",
358 ):
359 if not self.isconnected(): # WiFi is down
360 return False
361 length = 32 # DNS query and response packet size
362 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
363 s.setblocking(False)
364 s.connect(("8.8.8.8", 53))
365 await asyncio.sleep(1)
366 async with self.lock:
367 try:
368 await self._as_write(packet, sock=s)
369 await asyncio.sleep(2)
370 res = await self._as_read(length, s)
371 if len(res) == length:
372 return True # DNS response size OK
373 except OSError: # Timeout on read: no connectivity.
374 return False
375 finally:
376 s.close()
377 return False
378
379 async def broker_up(self): # Test broker connectivity
380 if not self.isconnected():
381 return False
382 tlast = self.last_rx
383 if ticks_diff(ticks_ms(), tlast) < 1000:
384 return True
385 try:
386 await self._ping()
387 except OSError:
388 return False
389 t = ticks_ms()
390 while not self._timeout(t):
391 await asyncio.sleep_ms(100)
392 if ticks_diff(self.last_rx, tlast) > 0: # Response received
393 return True
394 return False
395
396 async def disconnect(self):
397 if self._sock is not None:
398 await self._kill_tasks(False) # Keep socket open
399 try:
400 async with self.lock:
401 self._sock.write(b"\xe0\0") # Close broker connection
402 await asyncio.sleep_ms(100)
403 except OSError:
404 pass
405 self._close()
406 self._has_connected = False
407
408 def _close(self):
409 if self._sock is not None:
410 self._sock.close()
411
412 def close(
413 self,
414 ): # API. See https://github.com/peterhinch/micropython-mqtt/issues/60
415 self._close()
416
417 async def _await_pid(self, pid):
418 t = ticks_ms()
419 while pid in self.rcv_pids: # local copy
420 if self._timeout(t) or not self.isconnected():
421 break # Must repub or bail out
422 await asyncio.sleep_ms(100)
423 else:
424 return True # PID received. All done.
425 return False
426
427 # qos == 1: coro blocks until wait_msg gets correct PID.
428 # If WiFi fails completely subclass re-publishes with new PID.
429 async def publish(self, topic, msg, retain, qos, properties=None):
430 pid = next(self.newpid)
431 if qos:
432 self.rcv_pids.add(pid)
433 async with self.lock:
434 await self._publish(topic, msg, retain, qos, 0, pid, properties)
435 if qos == 0:
436 return
437
438 count = 0
439 while 1: # Await PUBACK, republish on timeout
440 if await self._await_pid(pid):
441 return
442 # No match
443 if count >= self._max_repubs or not self.isconnected():
444 raise OSError(-1) # Subclass to re-publish with new PID
445 async with self.lock:
446 # Add pid
447 await self._publish(
448 topic, msg, retain, qos, dup=1, pid=pid, properties=properties
449 )
450 count += 1
451 self.REPUB_COUNT += 1
452
453 async def _publish(self, topic, msg, retain, qos, dup, pid, properties=None):
454 pkt = bytearray(b"\x30\0\0\0")
455 pkt[0] |= qos << 1 | retain | dup << 3
456 sz = 2 + len(topic) + len(msg)
457 if qos > 0:
458 sz += 2
459
460 if self.mqttv5:
461 properties = encode_properties(properties)
462 sz += len(properties)
463
464 await self._as_write(pkt, vbi(pkt, 1, sz)) # Encode size as VBI
465 await self._send_str(topic)
466 if qos > 0:
467 struct.pack_into("!H", pkt, 0, pid)
468 await self._as_write(pkt, 2)
469 if self.mqttv5:
470 await self._as_write(properties)
471 await self._as_write(msg)
472
473 async def subscribe(self, topic, qos, properties=None):
474 await self._usub(topic, qos, properties)
475
476 async def unsubscribe(self, topic, properties=None):
477 await self._usub(topic, None, properties)
478
479 # Subscribe/unsubscribe
480 # Can raise OSError if WiFi fails. Subclass traps.
481 async def _usub(self, topic, qos, properties):
482 sub = qos is not None
483 pkt = bytearray(7)
484 pkt[0] = 0x82 if sub else 0xA2
485 pid = next(self.newpid)
486 self.rcv_pids.add(pid)
487 # 2 bytes of PID + 2 bytes of topic length + len(topic)
488 sz = 2 + 2 + len(topic) + (1 if sub else 0)
489 if self.mqttv5:
490 # Return length as VBI followed by properties or b'\0'
491 properties = encode_properties(properties)
492 sz += len(properties)
493 offs = vbi(pkt, 1, sz) # Store size as variable byte integer
494 struct.pack_into("!H", pkt, offs, pid)
495
496 async with self.lock:
497 await self._as_write(pkt, offs + 2)
498 if self.mqttv5:
499 await self._as_write(properties)
500 await self._send_str(topic)
501 if sub:
502 # Only QoS is supported other features such as:
503 # (NL) No Local, (RAP) Retain As Published and Retain Handling.
504 # Are not supported.
505 await self._as_write(qos.to_bytes(1, "little"))
506
507 if not await self._await_pid(pid):
508 raise OSError(-1)
509
510 # Remove a pending pid after a successful receive.
511 def kill_pid(self, pid, msg):
512 if pid in self.rcv_pids:
513 self.rcv_pids.discard(pid)
514 else:
515 raise OSError(-1, f"Invalid pid in {msg} packet")
516
517 # Wait for a single incoming MQTT message and process it.
518 # Subscribed messages are delivered to a callback previously
519 # set by .setup() method. Other (internal) MQTT
520 # messages processed internally.
521 # Immediate return if no data available. Called from ._handle_msg().
522 async def wait_msg(self):
523 mqttv5 = self.mqttv5 # Cache local
524 try:
525 res = self._sock.read(1) # Throws OSError on WiFi fail
526 except OSError as e:
527 if e.args[0] in BUSY_ERRORS: # Needed by RP2
528 await asyncio.sleep_ms(0)
529 return
530 raise
531
532 if res is None:
533 return
534 if res == b"":
535 raise OSError(-1, "Empty response") # Can happen on broker fail
536
537 if res == b"\xd0": # PINGRESP
538 await self._as_read(1) # Update .last_rx time
539 return
540 op = res[0]
541
542 if op == 0x40: # PUBACK
543 sz, _ = await self._recv_len()
544 if not mqttv5 and sz != 2:
545 raise OSError(-1, "Invalid PUBACK packet")
546 rcv_pid = await self._as_read(2)
547 pid = rcv_pid[0] << 8 | rcv_pid[1]
548 # For some reason even on MQTTv5 reason code is optional
549 if sz != 2:
550 reason_code = await self._as_read(1)
551 reason_code = reason_code[0]
552 if reason_code >= 0x80:
553 raise OSError(-1, "PUBACK reason code 0x%x" % reason_code)
554 if sz > 3:
555 puback_props_sz, _ = await self._recv_len()
556 if puback_props_sz > 0:
557 puback_props = await self._as_read(puback_props_sz)
558 decoded_props = decode_properties(puback_props, puback_props_sz)
559 self.dprint("PUBACK properties %s", decoded_props)
560 # No exception thrown: PUBACK successfuly received. Remove pending PID
561 self.kill_pid(pid, "PUBACK")
562
563 if op == 0x90 or op == 0xB0: # [UN]SUBACK
564 un = "UN" if op == 0xB0 else ""
565 suback = op == 0x90
566 sz, _ = await self._recv_len()
567 rcv_pid = await self._as_read(2)
568 pid = rcv_pid[0] << 8 | rcv_pid[1]
569 sz -= 2
570 # Handle properties
571 if mqttv5:
572 suback_props_sz, sz_len = await self._recv_len()
573 sz -= sz_len
574 sz -= suback_props_sz
575 if suback_props_sz > 0:
576 suback_props = await self._as_read(suback_props_sz)
577 decoded_props = decode_properties(suback_props, suback_props_sz)
578 self.dprint("[UN] SUBACK properties %s", decoded_props)
579
580 if sz > 1:
581 raise OSError(-1, "Got too many bytes")
582 if suback or mqttv5:
583 reason_code = await self._as_read(sz)
584 reason_code = reason_code[0]
585 if reason_code >= 0x80:
586 raise OSError(-1, f"{un}SUBACK reason code 0x{reason_code:x}")
587 self.kill_pid(pid, f"{un}SUBACK")
588
589 if op == 0xE0: # DISCONNECT
590 if mqttv5:
591 sz, _ = await self._recv_len()
592 reason_code = await self._as_read(1)
593 reason_code = reason_code[0]
594
595 sz -= 1
596 if sz > 0:
597 dis_props_sz, dis_len = await self._recv_len()
598 sz -= dis_len
599 disconnect_props = await self._as_read(dis_props_sz)
600 decoded_props = decode_properties(disconnect_props, dis_props_sz)
601 self.dprint("DISCONNECT properties %s", decoded_props)
602
603 if reason_code >= 0x80:
604 raise OSError(-1, "DISCONNECT reason code 0x%x" % reason_code)
605
606 if op & 0xF0 != 0x30:
607 return
608
609 sz, _ = await self._recv_len()
610 topic_len = await self._as_read(2)
611 topic_len = (topic_len[0] << 8) | topic_len[1]
612 topic = await self._as_read(topic_len)
613 topic = bytes(topic) # Copy before re-using the read buffer
614 sz -= topic_len + 2
615 # MQTT V3.1.1 section 2.3.1 non-normative comment. Get server PID.
616 if op & 6: # This is distinct from client PIDs.
617 pid = await self._as_read(2)
618 pid = pid[0] << 8 | pid[1]
619 sz -= 2
620
621 decoded_props = None
622 if mqttv5:
623 pub_props_sz, pub_props_sz_len = await self._recv_len()
624 sz -= pub_props_sz_len
625 sz -= pub_props_sz
626 if pub_props_sz > 0:
627 pub_props = await self._as_read(pub_props_sz)
628 decoded_props = decode_properties(pub_props, pub_props_sz)
629
630 msg = await self._as_read(sz)
631 # In event mode we must copy the message otherwise .queue contents will be wrong:
632 # every entry would contain the same message.
633 # In callback mode not copying the message is OK so long as the callback is purely
634 # synchronous. Overruns can't occur because of the lock.
635 if self._events or MSG_BYTES:
636 msg = bytes(msg)
637 retained = op & 0x01
638 args = [topic, msg, bool(retained)]
639 if mqttv5:
640 args.append(decoded_props)
641 self._cb(*args)
642
643 if op & 6 == 2: # qos 1
644 pkt = bytearray(b"\x40\x02\0\0") # Send PUBACK
645 struct.pack_into("!H", pkt, 2, pid)
646 await self._as_write(pkt)
647 elif op & 6 == 4: # qos 2 not supported
648 raise OSError(-1, "QoS 2 not supported")
649
650
651# MQTTClient class. Handles issues relating to connectivity.
652
653
654class MQTTClient(MQTT_base):
655 def __init__(self, config):
656 super().__init__(config)
657 self._isconnected = False # Current connection state
658 keepalive = 1000 * self._keepalive # ms
659 self._ping_interval = keepalive // 4 if keepalive else 20000
660 p_i = (
661 config["ping_interval"] * 1000
662 ) # Can specify shorter e.g. for subscribe-only
663 if p_i and p_i < self._ping_interval:
664 self._ping_interval = p_i
665 self._in_connect = False
666 self._has_connected = False # Define 'Clean Session' value to use.
667 self._tasks = []
668
669 async def connect(
670 self, *, quick=False
671 ): # Quick initial connect option for battery apps
672 if not self._has_connected:
673 # await self.wifi_connect(quick) # On 1st call, caller handles error
674 # Note this blocks if DNS lookup occurs. Do it once to prevent
675 # blocking during later internet outage:
676 self._addr = socket.getaddrinfo(self.server, self.port)[0][-1]
677 print(self._addr)
678 self._in_connect = True # Disable low level ._isconnected check
679 try:
680 is_clean = self._clean
681 if not self._has_connected and self._clean_init and not self._clean:
682 if self.mqttv5:
683 is_clean = True
684 else:
685 # Power up. Clear previous session data but subsequently save it.
686 # Issue #40
687 await self._connect(True) # Connect with clean session
688 try:
689 async with self.lock:
690 self._sock.write(
691 b"\xe0\0"
692 ) # Force disconnect but keep socket open
693 except OSError:
694 pass
695 self.dprint("Waiting for disconnect")
696 await asyncio.sleep(2) # Wait for broker to disconnect
697 self.dprint("About to reconnect with unclean session.")
698 await self._connect(is_clean)
699 except Exception:
700 self._close()
701 self._in_connect = False # Caller may run .isconnected()
702 raise
703 self.rcv_pids.clear()
704 # If we get here without error broker/LAN must be up.
705 self._isconnected = True
706 self._in_connect = False # Low level code can now check connectivity.
707 if not self._events:
708 asyncio.create_task(self._wifi_handler(True)) # User handler.
709 if not self._has_connected:
710 self._has_connected = True # Use normal clean flag on reconnect.
711 asyncio.create_task(self._keep_connected())
712 # Runs forever unless user issues .disconnect()
713
714 asyncio.create_task(self._handle_msg()) # Task quits on connection fail.
715 self._tasks.append(asyncio.create_task(self._keep_alive()))
716 if self.DEBUG:
717 self._tasks.append(asyncio.create_task(self._memory()))
718 if self._events:
719 self.up.set() # Connectivity is up
720 else:
721 asyncio.create_task(self._connect_handler(self)) # User handler.
722
723 # Launched by .connect(). Runs until connectivity fails. Checks for and
724 # handles incoming messages.
725 async def _handle_msg(self):
726 try:
727 while self.isconnected():
728 async with self.lock:
729 await self.wait_msg() # Immediate return if no message
730 # https://github.com/peterhinch/micropython-mqtt/issues/166
731 # A delay > 0 is necessary for webrepl compatibility.
732 await asyncio.sleep_ms(5) # Let other tasks get lock
733
734 except OSError:
735 pass
736 self._reconnect() # Broker or WiFi fail.
737
738 # Keep broker alive MQTT spec 3.1.2.10 Keep Alive.
739 # Runs until ping failure or no response in keepalive period.
740 async def _keep_alive(self):
741 while self.isconnected():
742 pings_due = ticks_diff(ticks_ms(), self.last_rx) // self._ping_interval
743 if pings_due >= 4:
744 self.dprint("Reconnect: broker fail.")
745 break
746 await asyncio.sleep_ms(self._ping_interval)
747 try:
748 await self._ping()
749 except OSError:
750 break
751 self._reconnect() # Broker or WiFi fail.
752
753 async def _kill_tasks(self, kill_skt): # Cancel running tasks
754 for task in self._tasks:
755 task.cancel()
756 self._tasks.clear()
757 await asyncio.sleep_ms(0) # Ensure cancellation complete
758 if kill_skt: # Close socket
759 self._close()
760
761 # DEBUG: show RAM messages.
762 async def _memory(self):
763 while True:
764 await asyncio.sleep(20)
765 gc.collect()
766 self.dprint("RAM free %d alloc %d", gc.mem_free(), gc.mem_alloc())
767
768 def isconnected(self):
769 if self._in_connect: # Disable low-level check during .connect()
770 return True
771
772 return self._isconnected
773
774 def _reconnect(self): # Schedule a reconnection if not underway.
775 if self._isconnected:
776 self._isconnected = False
777 asyncio.create_task(self._kill_tasks(True)) # Shut down tasks and socket
778 if self._events: # Signal an outage
779 self.down.set()
780 else:
781 asyncio.create_task(self._wifi_handler(False)) # User handler.
782
783 # Await broker connection.
784 async def _connection(self):
785 while not self._isconnected:
786 await asyncio.sleep(1)
787
788 # Scheduled on 1st successful connection. Runs forever maintaining wifi and
789 # broker connection. Must handle conditions at edge of WiFi range.
790 async def _keep_connected(self):
791 while self._has_connected:
792 if self.isconnected(): # Pause for 1 second
793 await asyncio.sleep(1)
794 gc.collect()
795 else: # Link is down, socket is closed, tasks are killed
796 if (
797 not self._has_connected
798 ): # User has issued the terminal .disconnect()
799 self.dprint("Disconnected, exiting _keep_connected")
800 break
801 try:
802 await self.connect()
803 # Now has set ._isconnected and scheduled _connect_handler().
804 self.dprint("Reconnect OK!")
805 except OSError as e:
806 self.dprint("Error in reconnect. %s", e)
807 # Can get ECONNABORTED or -1. The latter signifies no or bad CONNACK received.
808 self._close() # Disconnect and try again.
809 self._in_connect = False
810 self._isconnected = False
811 self.dprint("Disconnected, exited _keep_connected")
812
813 async def subscribe(self, topic, qos=0, properties=None):
814 qos_check(qos)
815 while 1:
816 await self._connection()
817 try:
818 return await super().subscribe(topic, qos, properties)
819 except OSError:
820 pass
821 self._reconnect() # Broker or WiFi fail.
822
823 async def unsubscribe(self, topic, properties=None):
824 while 1:
825 await self._connection()
826 try:
827 return await super().unsubscribe(topic, properties)
828 except OSError:
829 pass
830 self._reconnect() # Broker or WiFi fail.
831
832 async def publish(self, topic, msg, retain=False, qos=0, properties=None):
833 qos_check(qos)
834 while 1:
835 await self._connection()
836 try:
837 return await super().publish(topic, msg, retain, qos, properties)
838 except OSError:
839 pass
840 self._reconnect() # Broker or WiFi fail.
841
842
843# =======================
844# mqtt_test.py
845# =======================
846
847import json
848import asyncio
849
850
851class MqttModule:
852 def __init__(self):
853 self.client: MQTTClient = None
854
855 async def _net_down_watcher(self):
856 while True:
857 await self.client.down.wait()
858 self.client.down.clear()
859 print("!!!disconnected")
860
861 async def _subscriber(self):
862 while True:
863 await self.client.up.wait()
864 self.client.up.clear()
865 print("connected to broker")
866
867 topic = "/cassia/test/down"
868 await self.client.subscribe(topic)
869 print(f"sub topic ok: {topic}")
870
871 async def _messager(self):
872 async for topic, msg, retained in self.client.queue:
873 print("recv:", topic, msg, retained)
874
875 async def _start(self):
876 config = {
877 "client_id": "CC:1B:E0:00:00:01",
878 "server": "test.mosquitto.org",
879 "port": "1884",
880 "user": "rw",
881 "password": "readwrite",
882 "keepalive": 60,
883 "ping_interval": 0,
884 "ssl": False,
885 "ssl_params": {},
886 "response_time": 10,
887 "clean_init": True,
888 "clean": True,
889 "max_repubs": 4,
890 "will": None,
891 "subs_cb": lambda *_: None,
892 "ssid": None,
893 "wifi_pw": None,
894 "queue_len": 64,
895 "gateway": False,
896 "mqttv5": False,
897 "mqttv5_con_props": None,
898 }
899 MQTTClient.DEBUG = True
900 self.client = MQTTClient(config=config)
901
902 while True:
903 try:
904 print("connect start...")
905 await self.client.connect()
906 print("connect ok")
907 break
908 except Exception as e:
909 print("connect failed, waiting retry...")
910 await asyncio.sleep(3)
911
912 async def pub(self, topic: str, data: dict, qos: int = 0):
913 msg = json.dumps(data)
914 print("pub:", topic, msg)
915 await self.client.publish(topic, msg, qos=qos)
916
917 async def _heartbeater(self):
918 while True:
919 await self.pub("/cassia/test/up", {"hello": "world"}, qos=1)
920 await asyncio.sleep(3)
921
922 def co_tasks(self) -> list[asyncio.Task]:
923 return [
924 asyncio.create_task(self._net_down_watcher()),
925 asyncio.create_task(self._subscriber()),
926 asyncio.create_task(self._messager()),
927 asyncio.create_task(self._heartbeater()),
928 ]
929
930 async def start(self):
931 await self._start()
932
933
934async def main():
935 mqtt = MqttModule()
936 await mqtt.start()
937
938 tasks = mqtt.co_tasks()
939 await asyncio.gather(*tasks)
940
941
942asyncio.run(main())