micropython-mqtt

The MQTT library currently in use is the open-source micropython-mqtt library. Refer micropython-mqtt.

Note

  • The gateway does not have this library integrated by default; you will need to manually select and incorporate it into the script for use.

  • To ensure compatibility for operation within the gateway, the example has removed network stack-related processing.

Example

Description

For convenient and quick testing, the example uses the testing service provided by test.mosquitto.org .

Parameter Description

Parameter

Description

Example

client_id

Client ID

CC:1B:E0:00:00:01

server

MQTT Broker host

test.mosquitto.org

port

MQTT Broker port

1884

user

Username

rw

password

Password

readwrite

upstream topic

Publish Topic

/cassia/test/up

downstream topic

Subscribe Topic

/cassia/test/down

The testing tools use mosquitto_sub/mosquitto_pub. You may also use MQTTX or test scripts for this purpose.

# Subscribe test topic
mosquitto_sub -h test.mosquitto.org -p 1884 -u rw -P readwrite -t '/cassia/test/up'

# Publish test message
mosquitto_pub -h test.mosquitto.org -p 1884 -u rw -P readwrite -t '/cassia/test/down' -m '{"hello":"world"}'

Code

  1# ===========================
  2# micropython-mqtt mqtt_as.py
  3# ===========================
  4
  5# mqtt_as.py Asynchronous version of umqtt.robust
  6# (C) Copyright Peter Hinch 2017-2025.
  7# Released under the MIT licence.
  8
  9# Pyboard D support added also RP2/default
 10# Various improvements contributed by Kevin Köck
 11# V5 support added by Bob Veringa.
 12# Also other contributors.
 13
 14import gc
 15import socket
 16import struct
 17
 18gc.collect()
 19import asyncio
 20
 21gc.collect()
 22from time import ticks_ms, ticks_diff
 23from errno import EINPROGRESS, ETIMEDOUT
 24
 25gc.collect()
 26from sys import platform
 27
 28VERSION = (0, 8, 4)
 29# Default initial size for input messge buffer. Increase this if large messages
 30# are expected, but rarely, to avoid big runtime allocations
 31IBUFSIZE = 50
 32# By default the callback interface returns and incoming message as bytes.
 33# For performance reasons with large messages it may return a memoryview.
 34MSG_BYTES = True
 35
 36# Legitimate errors while waiting on a socket. See uasyncio __init__.py open_connection().
 37ESP32 = platform == "esp32"
 38RP2 = platform == "rp2"
 39if ESP32:
 40    # https://forum.micropython.org/viewtopic.php?f=16&t=3608&p=20942#p20942
 41    BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, 118, 119]  # Add in weird ESP32 errors
 42elif RP2:
 43    BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, -110]
 44else:
 45    from errno import ENOTCONN
 46
 47    BUSY_ERRORS = [ENOTCONN, EINPROGRESS, ETIMEDOUT]
 48
 49
 50# Default "do little" coro for optional user replacement
 51async def eliza(*_):  # e.g. via set_wifi_handler(coro): see test program
 52    await asyncio.sleep_ms(0)
 53
 54
 55class MsgQueue:
 56    def __init__(self, size):
 57        self._q = [0 for _ in range(max(size, 4))]
 58        self._size = size
 59        self._wi = 0
 60        self._ri = 0
 61        self._evt = asyncio.Event()
 62        self.discards = 0
 63
 64    def put(self, *v):
 65        self._q[self._wi] = v
 66        self._evt.set()
 67        self._wi = (self._wi + 1) % self._size
 68        if self._wi == self._ri:  # Would indicate empty
 69            self._ri = (self._ri + 1) % self._size  # Discard a message
 70            self.discards += 1
 71
 72    def __aiter__(self):
 73        return self
 74
 75    async def __anext__(self):
 76        if self._ri == self._wi:  # Empty
 77            self._evt.clear()
 78            await self._evt.wait()
 79        r = self._q[self._ri]
 80        self._ri = (self._ri + 1) % self._size
 81        return r
 82
 83
 84class MQTTException(Exception):
 85    pass
 86
 87
 88def pid_gen():
 89    pid = 0
 90    while True:
 91        pid = pid + 1 if pid < 65535 else 1
 92        yield pid
 93
 94
 95def qos_check(qos):
 96    if not (qos == 0 or qos == 1):
 97        raise ValueError("Only qos 0 and 1 are supported.")
 98
 99
100# Populate a byte array with a variable byte integer. Args: buf the bytearray,
101# offs: start offset. x the value. Returns the end offset.
102# 1-4 bytes allowed, encoding up to 268,435,455 (V3.1.1 table 2.4). No point trapping this.
103def vbi(buf: bytearray, offs: int, x: int):
104    buf[offs] = x & 0x7F
105    if x := x >> 7:
106        buf[offs] |= 0x80
107    return vbi(buf, offs + 1, x) if x else (offs + 1)
108
109
110encode_properties = None
111decode_properties = None
112
113
114class MQTT_base:
115    REPUB_COUNT = 0  # TEST
116    DEBUG = False
117
118    def __init__(self, config):
119        self._events = config["queue_len"] > 0
120        # MQTT config
121        self._client_id = config["client_id"]
122        self._user = config["user"]
123        self._pswd = config["password"]
124        self._keepalive = config["keepalive"]
125        if self._keepalive >= 65536:
126            raise ValueError("invalid keepalive time")
127        self._response_time = (
128            config["response_time"] * 1000
129        )  # Repub if no PUBACK received (ms).
130        self._max_repubs = config["max_repubs"]
131        self._clean_init = config[
132            "clean_init"
133        ]  # clean_session state on first connection
134        self._clean = config["clean"]  # clean_session state on reconnect
135        will = config["will"]
136        if will is None:
137            self._lw_topic = False
138        else:
139            self._set_last_will(*will)
140        self._ssl = config["ssl"]
141        self._ssl_params = config["ssl_params"]
142        # Callbacks and coros
143        if self._events:
144            self.up = asyncio.Event()
145            self.down = asyncio.Event()
146            self.queue = MsgQueue(config["queue_len"])
147            self._cb = self.queue.put
148        else:  # Callbacks
149            self._cb = config["subs_cb"]
150            self._wifi_handler = config["wifi_coro"]
151            self._connect_handler = config["connect_coro"]
152        # Network
153        self.port = config["port"]
154        if self.port == 0:
155            self.port = 8883 if self._ssl else 1883
156        self.server = config["server"]
157        if self.server is None:
158            raise ValueError("no server specified.")
159        self._sock = None
160
161        self.newpid = pid_gen()
162        self.rcv_pids = set()  # PUBACK and SUBACK pids awaiting ACK response
163        self.last_rx = ticks_ms()  # Time of last communication from broker
164        self.lock = asyncio.Lock()
165        self._ibuf = bytearray(IBUFSIZE)
166        self._mvbuf = memoryview(self._ibuf)
167
168        self.mqttv5 = config.get("mqttv5")
169        self.mqttv5_con_props = config.get("mqttv5_con_props")
170        self.topic_alias_maximum = 0
171
172        if self.mqttv5:
173            global encode_properties, decode_properties
174            from .mqtt_v5_properties import encode_properties, decode_properties  # noqa
175
176    def _set_last_will(self, topic, msg, retain=False, qos=0):
177        qos_check(qos)
178        if not topic:
179            raise ValueError("Empty topic.")
180        self._lw_topic = topic
181        self._lw_msg = msg
182        self._lw_qos = qos
183        self._lw_retain = retain
184
185    def dprint(self, msg, *args):
186        if self.DEBUG:
187            print(msg % args)
188
189    def _timeout(self, t):
190        return ticks_diff(ticks_ms(), t) > self._response_time
191
192    async def _as_read(self, n, sock=None):  # OSError caught by superclass
193        if sock is None:
194            sock = self._sock
195        # Ensure input buffer is big enough to hold data. It keeps the new size
196        oflow = n - len(self._ibuf)
197        if oflow > 0:  # Grow the buffer and re-create the memoryview
198            # Avoid too frequent small allocations by adding some extra bytes
199            self._ibuf.extend(bytearray(oflow + 50))
200            self._mvbuf = memoryview(self._ibuf)
201        buffer = self._mvbuf
202        size = 0
203        t = ticks_ms()
204        while size < n:
205            if self._timeout(t) or not self.isconnected():
206                raise OSError(-1, "Timeout on socket read")
207            try:
208                msg_size = sock.readinto(buffer[size:], n - size)
209            except OSError as e:  # ESP32 issues weird 119 errors here
210                msg_size = None
211                if e.args[0] not in BUSY_ERRORS:
212                    raise
213            if msg_size == 0:  # Connection closed by host
214                raise OSError(-1, "Connection closed by host")
215            if msg_size is not None:  # data received
216                size += msg_size
217                t = ticks_ms()
218                self.last_rx = ticks_ms()
219            await asyncio.sleep_ms(0)
220        return buffer[:n]
221
222    async def _as_write(self, bytes_wr, length=0, sock=None):
223        if sock is None:
224            sock = self._sock
225
226        # Wrap bytes in memoryview to avoid copying during slicing
227        bytes_wr = memoryview(bytes_wr)
228        if length:
229            bytes_wr = bytes_wr[:length]
230        t = ticks_ms()
231        while bytes_wr:
232            if self._timeout(t) or not self.isconnected():
233                raise OSError(-1, "Timeout on socket write")
234            try:
235                n = sock.write(bytes_wr)
236            except OSError as e:  # ESP32 issues weird 119 errors here
237                n = 0
238                if e.args[0] not in BUSY_ERRORS:
239                    raise
240            if n:
241                t = ticks_ms()
242                bytes_wr = bytes_wr[n:]
243            await asyncio.sleep_ms(0)
244
245    async def _send_str(self, s):
246        await self._as_write(struct.pack("!H", len(s)))
247        await self._as_write(s)
248
249    # Receive a Variable Byte Integer and decode.
250    async def _recv_len(self, d=0, i=0):
251        s = (await self._as_read(1))[0]
252        d |= (s & 0x7F) << (i * 7)
253        return await self._recv_len(d, i + 1) if (s & 0x80) else (d, i + 1)
254
255    async def _connect(self, clean):
256        mqttv5 = self.mqttv5  # Cache local
257        self._sock = socket.socket()
258        self._sock.setblocking(False)
259        try:
260            self._sock.connect(self._addr)
261        except OSError as e:
262            if e.args[0] not in BUSY_ERRORS:
263                raise
264        await asyncio.sleep_ms(0)
265        self.dprint("Connecting to broker.")
266        if self._ssl:
267            try:
268                import ssl
269            except ImportError:
270                import ussl as ssl
271
272            self._sock = ssl.wrap_socket(self._sock, **self._ssl_params)
273        premsg = bytearray(b"\x10\0\0\0\0\0")
274        msg = bytearray(b"\x04MQTT\x00\0\0\0")
275        msg[5] = 0x05 if mqttv5 else 0x04
276
277        sz = 10 + 2 + len(self._client_id)
278        msg[6] = clean << 1
279        if self._user:
280            sz += 2 + len(self._user) + 2 + len(self._pswd)
281            msg[6] |= 0xC0
282        if self._keepalive:
283            msg[7] |= self._keepalive >> 8
284            msg[8] |= self._keepalive & 0x00FF
285        if self._lw_topic:
286            sz += 2 + len(self._lw_topic) + 2 + len(self._lw_msg)
287            if mqttv5:
288                # Extra for the will properties
289                sz += 1
290            msg[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3
291            msg[6] |= self._lw_retain << 5
292
293        if mqttv5:
294            properties = encode_properties(self.mqttv5_con_props)
295            sz += len(properties)
296
297        i = vbi(premsg, 1, sz)  # sz -> Variable Byte Integer
298        await self._as_write(premsg, i + 1)
299        await self._as_write(msg)
300        if mqttv5:
301            await self._as_write(properties)
302
303        await self._send_str(self._client_id)
304        if self._lw_topic:
305            if mqttv5:
306                # We don't support will properties, so we send 0x00 for properties length
307                await self._as_write(b"\x00")
308            await self._send_str(self._lw_topic)
309            await self._send_str(self._lw_msg)
310        if self._user:
311            await self._send_str(self._user)
312            await self._send_str(self._pswd)
313        # Await CONNACK
314        # read causes ECONNABORTED if broker is out; triggers a reconnect.
315        del premsg, msg
316        packet_type = await self._as_read(1)
317        if packet_type[0] != 0x20:
318            raise OSError(-1, "CONNACK not received")
319        # The connect packet has changed, so size might be different now. But
320        # we can still handle it the same for 3.1.1 and v5
321        sz, _ = await self._recv_len()
322        if not mqttv5 and sz != 2:
323            raise OSError(-1, "Invalid CONNACK packet")
324
325        # Only read the first 2 bytes, as properties have their own length
326        connack_resp = await self._as_read(2)
327
328        # Connect ack flags
329        if connack_resp[0] != 0:
330            raise OSError(-1, "CONNACK flags not 0")
331        # Reason code
332        if connack_resp[1] != 0:
333            # On MQTTv5 Reason codes below 128 may need to be handled
334            # differently. For now, we just raise an error. Spec is a bit weird
335            # on this.
336            raise OSError(-1, "CONNACK reason code 0x%x" % connack_resp[1])
337
338        del connack_resp
339        if not mqttv5:
340            # If we are not on MQTTv5 we can stop here
341            return
342
343        connack_props_length, _ = await self._recv_len()
344        if connack_props_length > 0:
345            connack_props = await self._as_read(connack_props_length)
346            decoded_props = decode_properties(connack_props, connack_props_length)
347            self.dprint("CONNACK properties: %s", decoded_props)
348            self.topic_alias_maximum = decoded_props.get(0x22, 0)
349
350    async def _ping(self):
351        async with self.lock:
352            await self._as_write(b"\xc0\0")
353
354    # Check internet connectivity by sending DNS lookup to Google's 8.8.8.8
355    async def wan_ok(
356        self,
357        packet=b"$\x1a\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03www\x06google\x03com\x00\x00\x01\x00\x01",
358    ):
359        if not self.isconnected():  # WiFi is down
360            return False
361        length = 32  # DNS query and response packet size
362        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
363        s.setblocking(False)
364        s.connect(("8.8.8.8", 53))
365        await asyncio.sleep(1)
366        async with self.lock:
367            try:
368                await self._as_write(packet, sock=s)
369                await asyncio.sleep(2)
370                res = await self._as_read(length, s)
371                if len(res) == length:
372                    return True  # DNS response size OK
373            except OSError:  # Timeout on read: no connectivity.
374                return False
375            finally:
376                s.close()
377        return False
378
379    async def broker_up(self):  # Test broker connectivity
380        if not self.isconnected():
381            return False
382        tlast = self.last_rx
383        if ticks_diff(ticks_ms(), tlast) < 1000:
384            return True
385        try:
386            await self._ping()
387        except OSError:
388            return False
389        t = ticks_ms()
390        while not self._timeout(t):
391            await asyncio.sleep_ms(100)
392            if ticks_diff(self.last_rx, tlast) > 0:  # Response received
393                return True
394        return False
395
396    async def disconnect(self):
397        if self._sock is not None:
398            await self._kill_tasks(False)  # Keep socket open
399            try:
400                async with self.lock:
401                    self._sock.write(b"\xe0\0")  # Close broker connection
402                    await asyncio.sleep_ms(100)
403            except OSError:
404                pass
405            self._close()
406        self._has_connected = False
407
408    def _close(self):
409        if self._sock is not None:
410            self._sock.close()
411
412    def close(
413        self,
414    ):  # API. See https://github.com/peterhinch/micropython-mqtt/issues/60
415        self._close()
416
417    async def _await_pid(self, pid):
418        t = ticks_ms()
419        while pid in self.rcv_pids:  # local copy
420            if self._timeout(t) or not self.isconnected():
421                break  # Must repub or bail out
422            await asyncio.sleep_ms(100)
423        else:
424            return True  # PID received. All done.
425        return False
426
427    # qos == 1: coro blocks until wait_msg gets correct PID.
428    # If WiFi fails completely subclass re-publishes with new PID.
429    async def publish(self, topic, msg, retain, qos, properties=None):
430        pid = next(self.newpid)
431        if qos:
432            self.rcv_pids.add(pid)
433        async with self.lock:
434            await self._publish(topic, msg, retain, qos, 0, pid, properties)
435        if qos == 0:
436            return
437
438        count = 0
439        while 1:  # Await PUBACK, republish on timeout
440            if await self._await_pid(pid):
441                return
442            # No match
443            if count >= self._max_repubs or not self.isconnected():
444                raise OSError(-1)  # Subclass to re-publish with new PID
445            async with self.lock:
446                # Add pid
447                await self._publish(
448                    topic, msg, retain, qos, dup=1, pid=pid, properties=properties
449                )
450            count += 1
451            self.REPUB_COUNT += 1
452
453    async def _publish(self, topic, msg, retain, qos, dup, pid, properties=None):
454        pkt = bytearray(b"\x30\0\0\0")
455        pkt[0] |= qos << 1 | retain | dup << 3
456        sz = 2 + len(topic) + len(msg)
457        if qos > 0:
458            sz += 2
459
460        if self.mqttv5:
461            properties = encode_properties(properties)
462            sz += len(properties)
463
464        await self._as_write(pkt, vbi(pkt, 1, sz))  # Encode size as VBI
465        await self._send_str(topic)
466        if qos > 0:
467            struct.pack_into("!H", pkt, 0, pid)
468            await self._as_write(pkt, 2)
469        if self.mqttv5:
470            await self._as_write(properties)
471        await self._as_write(msg)
472
473    async def subscribe(self, topic, qos, properties=None):
474        await self._usub(topic, qos, properties)
475
476    async def unsubscribe(self, topic, properties=None):
477        await self._usub(topic, None, properties)
478
479    # Subscribe/unsubscribe
480    # Can raise OSError if WiFi fails. Subclass traps.
481    async def _usub(self, topic, qos, properties):
482        sub = qos is not None
483        pkt = bytearray(7)
484        pkt[0] = 0x82 if sub else 0xA2
485        pid = next(self.newpid)
486        self.rcv_pids.add(pid)
487        # 2 bytes of PID + 2 bytes of topic length + len(topic)
488        sz = 2 + 2 + len(topic) + (1 if sub else 0)
489        if self.mqttv5:
490            # Return length as VBI followed by properties or b'\0'
491            properties = encode_properties(properties)
492            sz += len(properties)
493        offs = vbi(pkt, 1, sz)  # Store size as variable byte integer
494        struct.pack_into("!H", pkt, offs, pid)
495
496        async with self.lock:
497            await self._as_write(pkt, offs + 2)
498            if self.mqttv5:
499                await self._as_write(properties)
500            await self._send_str(topic)
501            if sub:
502                # Only QoS is supported other features such as:
503                # (NL) No Local, (RAP) Retain As Published and Retain Handling.
504                # Are not supported.
505                await self._as_write(qos.to_bytes(1, "little"))
506
507        if not await self._await_pid(pid):
508            raise OSError(-1)
509
510    # Remove a pending pid after a successful receive.
511    def kill_pid(self, pid, msg):
512        if pid in self.rcv_pids:
513            self.rcv_pids.discard(pid)
514        else:
515            raise OSError(-1, f"Invalid pid in {msg} packet")
516
517    # Wait for a single incoming MQTT message and process it.
518    # Subscribed messages are delivered to a callback previously
519    # set by .setup() method. Other (internal) MQTT
520    # messages processed internally.
521    # Immediate return if no data available. Called from ._handle_msg().
522    async def wait_msg(self):
523        mqttv5 = self.mqttv5  # Cache local
524        try:
525            res = self._sock.read(1)  # Throws OSError on WiFi fail
526        except OSError as e:
527            if e.args[0] in BUSY_ERRORS:  # Needed by RP2
528                await asyncio.sleep_ms(0)
529                return
530            raise
531
532        if res is None:
533            return
534        if res == b"":
535            raise OSError(-1, "Empty response")  # Can happen on broker fail
536
537        if res == b"\xd0":  # PINGRESP
538            await self._as_read(1)  # Update .last_rx time
539            return
540        op = res[0]
541
542        if op == 0x40:  # PUBACK
543            sz, _ = await self._recv_len()
544            if not mqttv5 and sz != 2:
545                raise OSError(-1, "Invalid PUBACK packet")
546            rcv_pid = await self._as_read(2)
547            pid = rcv_pid[0] << 8 | rcv_pid[1]
548            # For some reason even on MQTTv5 reason code is optional
549            if sz != 2:
550                reason_code = await self._as_read(1)
551                reason_code = reason_code[0]
552                if reason_code >= 0x80:
553                    raise OSError(-1, "PUBACK reason code 0x%x" % reason_code)
554            if sz > 3:
555                puback_props_sz, _ = await self._recv_len()
556                if puback_props_sz > 0:
557                    puback_props = await self._as_read(puback_props_sz)
558                    decoded_props = decode_properties(puback_props, puback_props_sz)
559                    self.dprint("PUBACK properties %s", decoded_props)
560            # No exception thrown: PUBACK successfuly received. Remove pending PID
561            self.kill_pid(pid, "PUBACK")
562
563        if op == 0x90 or op == 0xB0:  # [UN]SUBACK
564            un = "UN" if op == 0xB0 else ""
565            suback = op == 0x90
566            sz, _ = await self._recv_len()
567            rcv_pid = await self._as_read(2)
568            pid = rcv_pid[0] << 8 | rcv_pid[1]
569            sz -= 2
570            # Handle properties
571            if mqttv5:
572                suback_props_sz, sz_len = await self._recv_len()
573                sz -= sz_len
574                sz -= suback_props_sz
575                if suback_props_sz > 0:
576                    suback_props = await self._as_read(suback_props_sz)
577                    decoded_props = decode_properties(suback_props, suback_props_sz)
578                    self.dprint("[UN] SUBACK properties %s", decoded_props)
579
580            if sz > 1:
581                raise OSError(-1, "Got too many bytes")
582            if suback or mqttv5:
583                reason_code = await self._as_read(sz)
584                reason_code = reason_code[0]
585                if reason_code >= 0x80:
586                    raise OSError(-1, f"{un}SUBACK reason code 0x{reason_code:x}")
587            self.kill_pid(pid, f"{un}SUBACK")
588
589        if op == 0xE0:  # DISCONNECT
590            if mqttv5:
591                sz, _ = await self._recv_len()
592                reason_code = await self._as_read(1)
593                reason_code = reason_code[0]
594
595                sz -= 1
596                if sz > 0:
597                    dis_props_sz, dis_len = await self._recv_len()
598                    sz -= dis_len
599                    disconnect_props = await self._as_read(dis_props_sz)
600                    decoded_props = decode_properties(disconnect_props, dis_props_sz)
601                    self.dprint("DISCONNECT properties %s", decoded_props)
602
603                if reason_code >= 0x80:
604                    raise OSError(-1, "DISCONNECT reason code 0x%x" % reason_code)
605
606        if op & 0xF0 != 0x30:
607            return
608
609        sz, _ = await self._recv_len()
610        topic_len = await self._as_read(2)
611        topic_len = (topic_len[0] << 8) | topic_len[1]
612        topic = await self._as_read(topic_len)
613        topic = bytes(topic)  # Copy before re-using the read buffer
614        sz -= topic_len + 2
615        # MQTT V3.1.1 section 2.3.1 non-normative comment. Get server PID.
616        if op & 6:  # This is distinct from client PIDs.
617            pid = await self._as_read(2)
618            pid = pid[0] << 8 | pid[1]
619            sz -= 2
620
621        decoded_props = None
622        if mqttv5:
623            pub_props_sz, pub_props_sz_len = await self._recv_len()
624            sz -= pub_props_sz_len
625            sz -= pub_props_sz
626            if pub_props_sz > 0:
627                pub_props = await self._as_read(pub_props_sz)
628                decoded_props = decode_properties(pub_props, pub_props_sz)
629
630        msg = await self._as_read(sz)
631        # In event mode we must copy the message otherwise .queue contents will be wrong:
632        # every entry would contain the same message.
633        # In callback mode not copying the message is OK so long as the callback is purely
634        # synchronous. Overruns can't occur because of the lock.
635        if self._events or MSG_BYTES:
636            msg = bytes(msg)
637        retained = op & 0x01
638        args = [topic, msg, bool(retained)]
639        if mqttv5:
640            args.append(decoded_props)
641        self._cb(*args)
642
643        if op & 6 == 2:  # qos 1
644            pkt = bytearray(b"\x40\x02\0\0")  # Send PUBACK
645            struct.pack_into("!H", pkt, 2, pid)
646            await self._as_write(pkt)
647        elif op & 6 == 4:  # qos 2 not supported
648            raise OSError(-1, "QoS 2 not supported")
649
650
651# MQTTClient class. Handles issues relating to connectivity.
652
653
654class MQTTClient(MQTT_base):
655    def __init__(self, config):
656        super().__init__(config)
657        self._isconnected = False  # Current connection state
658        keepalive = 1000 * self._keepalive  # ms
659        self._ping_interval = keepalive // 4 if keepalive else 20000
660        p_i = (
661            config["ping_interval"] * 1000
662        )  # Can specify shorter e.g. for subscribe-only
663        if p_i and p_i < self._ping_interval:
664            self._ping_interval = p_i
665        self._in_connect = False
666        self._has_connected = False  # Define 'Clean Session' value to use.
667        self._tasks = []
668
669    async def connect(
670        self, *, quick=False
671    ):  # Quick initial connect option for battery apps
672        if not self._has_connected:
673            # await self.wifi_connect(quick)  # On 1st call, caller handles error
674            # Note this blocks if DNS lookup occurs. Do it once to prevent
675            # blocking during later internet outage:
676            self._addr = socket.getaddrinfo(self.server, self.port)[0][-1]
677            print(self._addr)
678        self._in_connect = True  # Disable low level ._isconnected check
679        try:
680            is_clean = self._clean
681            if not self._has_connected and self._clean_init and not self._clean:
682                if self.mqttv5:
683                    is_clean = True
684                else:
685                    # Power up. Clear previous session data but subsequently save it.
686                    # Issue #40
687                    await self._connect(True)  # Connect with clean session
688                    try:
689                        async with self.lock:
690                            self._sock.write(
691                                b"\xe0\0"
692                            )  # Force disconnect but keep socket open
693                    except OSError:
694                        pass
695                    self.dprint("Waiting for disconnect")
696                    await asyncio.sleep(2)  # Wait for broker to disconnect
697                    self.dprint("About to reconnect with unclean session.")
698            await self._connect(is_clean)
699        except Exception:
700            self._close()
701            self._in_connect = False  # Caller may run .isconnected()
702            raise
703        self.rcv_pids.clear()
704        # If we get here without error broker/LAN must be up.
705        self._isconnected = True
706        self._in_connect = False  # Low level code can now check connectivity.
707        if not self._events:
708            asyncio.create_task(self._wifi_handler(True))  # User handler.
709        if not self._has_connected:
710            self._has_connected = True  # Use normal clean flag on reconnect.
711            asyncio.create_task(self._keep_connected())
712            # Runs forever unless user issues .disconnect()
713
714        asyncio.create_task(self._handle_msg())  # Task quits on connection fail.
715        self._tasks.append(asyncio.create_task(self._keep_alive()))
716        if self.DEBUG:
717            self._tasks.append(asyncio.create_task(self._memory()))
718        if self._events:
719            self.up.set()  # Connectivity is up
720        else:
721            asyncio.create_task(self._connect_handler(self))  # User handler.
722
723    # Launched by .connect(). Runs until connectivity fails. Checks for and
724    # handles incoming messages.
725    async def _handle_msg(self):
726        try:
727            while self.isconnected():
728                async with self.lock:
729                    await self.wait_msg()  # Immediate return if no message
730                # https://github.com/peterhinch/micropython-mqtt/issues/166
731                # A delay > 0 is necessary for webrepl compatibility.
732                await asyncio.sleep_ms(5)  # Let other tasks get lock
733
734        except OSError:
735            pass
736        self._reconnect()  # Broker or WiFi fail.
737
738    # Keep broker alive MQTT spec 3.1.2.10 Keep Alive.
739    # Runs until ping failure or no response in keepalive period.
740    async def _keep_alive(self):
741        while self.isconnected():
742            pings_due = ticks_diff(ticks_ms(), self.last_rx) // self._ping_interval
743            if pings_due >= 4:
744                self.dprint("Reconnect: broker fail.")
745                break
746            await asyncio.sleep_ms(self._ping_interval)
747            try:
748                await self._ping()
749            except OSError:
750                break
751        self._reconnect()  # Broker or WiFi fail.
752
753    async def _kill_tasks(self, kill_skt):  # Cancel running tasks
754        for task in self._tasks:
755            task.cancel()
756        self._tasks.clear()
757        await asyncio.sleep_ms(0)  # Ensure cancellation complete
758        if kill_skt:  # Close socket
759            self._close()
760
761    # DEBUG: show RAM messages.
762    async def _memory(self):
763        while True:
764            await asyncio.sleep(20)
765            gc.collect()
766            self.dprint("RAM free %d alloc %d", gc.mem_free(), gc.mem_alloc())
767
768    def isconnected(self):
769        if self._in_connect:  # Disable low-level check during .connect()
770            return True
771
772        return self._isconnected
773
774    def _reconnect(self):  # Schedule a reconnection if not underway.
775        if self._isconnected:
776            self._isconnected = False
777            asyncio.create_task(self._kill_tasks(True))  # Shut down tasks and socket
778            if self._events:  # Signal an outage
779                self.down.set()
780            else:
781                asyncio.create_task(self._wifi_handler(False))  # User handler.
782
783    # Await broker connection.
784    async def _connection(self):
785        while not self._isconnected:
786            await asyncio.sleep(1)
787
788    # Scheduled on 1st successful connection. Runs forever maintaining wifi and
789    # broker connection. Must handle conditions at edge of WiFi range.
790    async def _keep_connected(self):
791        while self._has_connected:
792            if self.isconnected():  # Pause for 1 second
793                await asyncio.sleep(1)
794                gc.collect()
795            else:  # Link is down, socket is closed, tasks are killed
796                if (
797                    not self._has_connected
798                ):  # User has issued the terminal .disconnect()
799                    self.dprint("Disconnected, exiting _keep_connected")
800                    break
801                try:
802                    await self.connect()
803                    # Now has set ._isconnected and scheduled _connect_handler().
804                    self.dprint("Reconnect OK!")
805                except OSError as e:
806                    self.dprint("Error in reconnect. %s", e)
807                    # Can get ECONNABORTED or -1. The latter signifies no or bad CONNACK received.
808                    self._close()  # Disconnect and try again.
809                    self._in_connect = False
810                    self._isconnected = False
811        self.dprint("Disconnected, exited _keep_connected")
812
813    async def subscribe(self, topic, qos=0, properties=None):
814        qos_check(qos)
815        while 1:
816            await self._connection()
817            try:
818                return await super().subscribe(topic, qos, properties)
819            except OSError:
820                pass
821            self._reconnect()  # Broker or WiFi fail.
822
823    async def unsubscribe(self, topic, properties=None):
824        while 1:
825            await self._connection()
826            try:
827                return await super().unsubscribe(topic, properties)
828            except OSError:
829                pass
830            self._reconnect()  # Broker or WiFi fail.
831
832    async def publish(self, topic, msg, retain=False, qos=0, properties=None):
833        qos_check(qos)
834        while 1:
835            await self._connection()
836            try:
837                return await super().publish(topic, msg, retain, qos, properties)
838            except OSError:
839                pass
840            self._reconnect()  # Broker or WiFi fail.
841
842
843# =======================
844# mqtt_test.py
845# =======================
846
847import json
848import asyncio
849
850
851class MqttModule:
852    def __init__(self):
853        self.client: MQTTClient = None
854
855    async def _net_down_watcher(self):
856        while True:
857            await self.client.down.wait()
858            self.client.down.clear()
859            print("!!!disconnected")
860
861    async def _subscriber(self):
862        while True:
863            await self.client.up.wait()
864            self.client.up.clear()
865            print("connected to broker")
866
867            topic = "/cassia/test/down"
868            await self.client.subscribe(topic)
869            print(f"sub topic ok: {topic}")
870
871    async def _messager(self):
872        async for topic, msg, retained in self.client.queue:
873            print("recv:", topic, msg, retained)
874
875    async def _start(self):
876        config = {
877            "client_id": "CC:1B:E0:00:00:01",
878            "server": "test.mosquitto.org",
879            "port": "1884",
880            "user": "rw",
881            "password": "readwrite",
882            "keepalive": 60,
883            "ping_interval": 0,
884            "ssl": False,
885            "ssl_params": {},
886            "response_time": 10,
887            "clean_init": True,
888            "clean": True,
889            "max_repubs": 4,
890            "will": None,
891            "subs_cb": lambda *_: None,
892            "ssid": None,
893            "wifi_pw": None,
894            "queue_len": 64,
895            "gateway": False,
896            "mqttv5": False,
897            "mqttv5_con_props": None,
898        }
899        MQTTClient.DEBUG = True
900        self.client = MQTTClient(config=config)
901
902        while True:
903            try:
904                print("connect start...")
905                await self.client.connect()
906                print("connect ok")
907                break
908            except Exception as e:
909                print("connect failed, waiting retry...")
910                await asyncio.sleep(3)
911
912    async def pub(self, topic: str, data: dict, qos: int = 0):
913        msg = json.dumps(data)
914        print("pub:", topic, msg)
915        await self.client.publish(topic, msg, qos=qos)
916
917    async def _heartbeater(self):
918        while True:
919            await self.pub("/cassia/test/up", {"hello": "world"}, qos=1)
920            await asyncio.sleep(3)
921
922    def co_tasks(self) -> list[asyncio.Task]:
923        return [
924            asyncio.create_task(self._net_down_watcher()),
925            asyncio.create_task(self._subscriber()),
926            asyncio.create_task(self._messager()),
927            asyncio.create_task(self._heartbeater()),
928        ]
929
930    async def start(self):
931        await self._start()
932
933
934async def main():
935    mqtt = MqttModule()
936    await mqtt.start()
937
938    tasks = mqtt.co_tasks()
939    await asyncio.gather(*tasks)
940
941
942asyncio.run(main())