Run Locally

micropython supports multiple platforms and can be used to run and test the basic APP logic locally.

System Information

cat /etc/issue
Ubuntu 22.04 LTS \n \l

uname -a
Linux VM-0-9-ubuntu 5.15.0-106-generic

Download Source Code

git clone --branch v1.24.1 --depth 1 https://github.com/micropython/micropython.git
git submodule update --init --recursive

Install Dependencies

sudo apt update
sudo apt install -y build-essential git pkg-config \
    libffi-dev libssl-dev libbz2-dev liblzma-dev \
    libreadline-dev libsqlite3-dev libgdbm-dev \
    libncurses5-dev zlib1g-dev libmpdec-dev \
    libmbedtls-dev libdb5.3-dev uuid-dev

Compile and Build

cd micropython/ports/unix
make clean
make -j$(nproc) MICROPY_PY_USER_CMODULE=0


# LINK build-standard/micropython
#   text       data     bss     dec     hex filename
# 764607      69328    7088  841023   cd53f build-standard/micropython

Check Version

./build-standard/micropython --version

# MicroPython v1.24.1 on 2025-09-09; linux [GCC 11.2.0] version

Prepare Script

Prepare hello.py with the following script content:

echo 'print("hello")' > hello.py

Run the Script

./build-standard/micropython hello.py
hello

Cassiablue API Mock

To debug code related to the Cassiablue API, you can refer to the following implementation using the gateway RESTful API to mock the Cassiablue API, which allows for quick testing of basic logic and functionality.

Note

  • For local debugging of basic code logic only; not intended for stress or stability testing.

  • The actual script execution results should be based on operation within the M2000 gateway.

  1"""
  2Module name: cassiablue.py  
  3Purpose: Local MicroPython debugging mock. Exposes the same high-level API as the real gateway by forwarding calls to its REST + Server-Sent-Events interface, so you can develop and test your MicroPython logic on a PC without flashing firmware.  
  4MicroPython compatibility: v1.24.1  
  5Important: Do **not** copy this file into the gateway itself—the gateway already contains a native C implementation. This module is **only** for convenient desktop debugging and has no performance guarantees.  
  6Usage: Change the variable `GATEWAY` to the actual IP address of your gateway.
  7"""
  8
  9GATEWAY = ""
 10
 11import json
 12import asyncio
 13
 14from cassia_log import get_logger
 15import aiohttp
 16
 17try:
 18    from typing import Optional
 19except ImportError:
 20    pass
 21
 22
 23log = get_logger("__mock_cassiablue__")
 24
 25_gateway_type = "M2000"
 26_gateway_mac = "00:00:00:00:00:00"
 27_gateway_ver = ""
 28
 29
 30class AsyncQueue:
 31    def __init__(self):
 32        self._buffer = []
 33        self._flag = asyncio.ThreadSafeFlag()
 34
 35    def put_nowait(self, item):
 36        self._buffer.append(item)
 37        self._flag.set()
 38
 39    async def get(self):
 40        while not self._buffer:
 41            await self._flag.wait()
 42        return self._buffer.pop(0)
 43
 44
 45async def send_cmd(url, method="GET", query=None, body=None):
 46    global GATEWAY
 47    url = f"http://{GATEWAY}{url}"
 48
 49    print("send cmd start:", method, url, query, body)
 50
 51    async with aiohttp.ClientSession() as session:
 52        if body is not None:
 53            async with session.request(
 54                method=method, url=url, json=json.loads(body)
 55            ) as resp:
 56                if resp.status == 200:
 57                    text = await resp.text()
 58                    return True, text
 59                else:
 60                    text = await resp.text()
 61                    return False, text
 62        else:
 63            async with session.request(method=method, url=url) as resp:
 64                if resp.status == 200:
 65                    text = await resp.text()
 66                    return True, text
 67                else:
 68                    text = await resp.text()
 69                    return False, text
 70
 71
 72# address is a string like "00:11:22:33:44:55"
 73# params is a json string like '{"param1": "value1", "param2": "value2"}'
 74async def connect(addr, params=None):
 75    if not addr:
 76        raise ValueError("Address cannot be empty")
 77    url = "/gap/nodes/{}/connection".format(addr)
 78    return await send_cmd(url, "POST", None, body=params)
 79
 80
 81# address is a string like "00:11:22:33:44:55"
 82async def disconnect(addr):
 83    if not addr:
 84        raise ValueError("Address cannot be empty")
 85    url = "/gap/nodes/{}/connection".format(addr)
 86    return await send_cmd(url, "DELETE")
 87
 88
 89async def get_connected_devices():
 90    return await send_cmd("/gap/nodes", "GET")
 91
 92
 93async def gatt_discover(addr):
 94    if not addr:
 95        raise ValueError("Address cannot be empty")
 96    url = "/gatt/nodes/{}/services/characteristics/descriptors".format(addr)
 97    return await send_cmd(url, "GET")
 98
 99
100async def gatt_read(addr, handle):
101    if not addr or not handle:
102        raise ValueError("Address and handle cannot be empty")
103    url = "/gatt/nodes/{}/handle/{}/value".format(addr, handle)
104    return await send_cmd(url, "GET")
105
106
107async def gatt_write(addr, handle, value):
108    if not addr or not handle or value is None:
109        raise ValueError("Address, handle, and value cannot be empty")
110    url = "/gatt/nodes/{}/handle/{}/value/{}".format(addr, handle, value)
111    return await send_cmd(url, "GET")
112
113
114class SSEClient:
115    def __init__(
116        self,
117        host: str,
118        path: str,
119        reconnect_delay: int = 3,
120    ):
121        self.log = get_logger(self.__class__.__name__)
122        self.host = host
123        self.path = path
124        self.reconnect_delay = reconnect_delay
125        self.running = True
126        self.queue = AsyncQueue()
127        self.reader = None
128        self.writer = None
129
130    async def connect(self):
131        self.log.info(f"connect to sse start: {self.host}{self.path}")
132        self.reader, self.writer = await asyncio.open_connection(self.host, 80)
133        req = (
134            f"GET {self.path} HTTP/1.1\r\n"
135            f"Accept: text/event-stream\r\n"
136            f"Host: {self.host}\r\n"
137            f"Connection: keep-alive\r\n"
138            "\r\n"
139        )
140        self.writer.write(req.encode())
141        await self.writer.drain()
142        self.log.info(f"connect to sse ok")
143
144    async def co_read(self):
145        while self.running:
146            try:
147                line = await self.reader.readline()
148
149                if not line:
150                    raise OSError("connection closed")
151
152                line = line.decode().strip()
153
154                if not line:
155                    continue
156
157                log.debug("raw line:", line)
158                if not (line[0] in ("{", "[") or line.startswith("data: {")):
159                    continue
160
161                log.debug("raw line:", line)
162                line = line.replace("data: ", "")
163                line = line.replace("\n", "")
164                line = line.replace("\r", "")
165                line = line.replace("\r\n", "")
166
167                try:
168                    data = json.loads(line)
169
170                    if "bdaddrs" in data:
171                        data["bdaddr"] = data["bdaddrs"][0]["bdaddr"]
172                        data["bdaddrType"] = data["bdaddrs"][0]["bdaddrType"]
173                        del data["bdaddrs"]
174
175                    self.queue.put_nowait(data)
176                except Exception as e:
177                    self.log.info("parse data error:", e, line)
178
179            except Exception as e:
180                self.log.error("sse disconnected:", e)
181                await asyncio.sleep(self.reconnect_delay)
182                self.log.info("reconnecting...")
183
184    async def stop(self):
185        self.running = False
186
187
188#######################
189# scan sse
190#######################
191
192scan_sse_client: Optional[SSEClient] = None
193
194
195class BLEScanResult:
196    def __aiter__(self):
197        return self
198
199    async def __anext__(self):
200        global scan_sse_client
201        item = await scan_sse_client.queue.get()
202        return item
203
204
205def scan_result():
206    return BLEScanResult()
207
208
209async def start_scan(query: str = None):
210    global scan_sse_client
211
212    qs = ""
213    if query is None:
214        qs = "event=1"
215    else:
216        if "event=1" not in query:
217            qs = qs + "event=1&"
218        qs = qs + query
219
220    log.info("start scan:", qs)
221
222    scan_sse_client = SSEClient(
223        host=GATEWAY,
224        path=f"/gap/nodes?{qs}",
225    )
226
227    await scan_sse_client.connect()
228
229    asyncio.create_task(scan_sse_client.co_read())
230
231    return True, "OK"
232
233
234#######################
235# notify sse
236#######################
237
238notify_sse_client: Optional[SSEClient] = None
239
240
241class BLENotifyResult:
242    def __aiter__(self):
243        return self
244
245    async def __anext__(self):
246        global notify_sse_client
247        item = await notify_sse_client.queue.get()
248        return item
249
250
251def notify_result():
252    return BLENotifyResult()
253
254
255async def start_recv_notify(query: str = None):
256    global notify_sse_client
257
258    qs = ""
259    if query is None:
260        qs = "event=1"
261    else:
262        if "event=1" not in query:
263            qs = qs + "event=1&"
264        qs = qs + query
265
266    log.info("start notify:", qs)
267
268    notify_sse_client = SSEClient(
269        host=GATEWAY,
270        path=f"/gatt/nodes?{qs}",
271    )
272
273    await notify_sse_client.connect()
274
275    asyncio.create_task(notify_sse_client.co_read())
276
277    return True, "OK"
278
279
280#######################
281# state sse
282#######################
283
284state_sse_client: Optional[SSEClient] = None
285
286
287class BLEConnectionResult:
288    def __aiter__(self):
289        return self
290
291    async def __anext__(self):
292        global state_sse_client
293        item = await state_sse_client.queue.get()
294        return item
295
296
297def connection_result():
298    return BLEConnectionResult()
299
300
301async def start_recv_connection_state():
302    global state_sse_client
303
304    log.info("start state")
305
306    state_sse_client = SSEClient(
307        host=GATEWAY,
308        path=f"/management/nodes/connection-state",
309    )
310
311    await state_sse_client.connect()
312    asyncio.create_task(state_sse_client.co_read())
313
314    return True, "OK"
315
316
317def set_gateway(ip: str):
318    global GATEWAY
319    GATEWAY = ip