本地运行

micropython 支持多平台,可以在本地运行测试APP基础代码逻辑。

使用系统

cat /etc/issue
Ubuntu 22.04 LTS \n \l

uname -a
Linux VM-0-9-ubuntu 5.15.0-106-generic

下载源码

git clone --branch v1.24.1 --depth 1 https://github.com/micropython/micropython.git
git submodule update --init --recursive

安装依赖

sudo apt update
sudo apt install -y build-essential git pkg-config \
    libffi-dev libssl-dev libbz2-dev liblzma-dev \
    libreadline-dev libsqlite3-dev libgdbm-dev \
    libncurses5-dev zlib1g-dev libmpdec-dev \
    libmbedtls-dev libdb5.3-dev uuid-dev

编译构建

cd micropython/ports/unix
make clean
make -j$(nproc) MICROPY_PY_USER_CMODULE=0


# LINK build-standard/micropython
#   text       data     bss     dec     hex filename
# 764607      69328    7088  841023   cd53f build-standard/micropython

查看版本

./build-standard/micropython --version

# MicroPython v1.24.1 on 2025-09-09; linux [GCC 11.2.0] version

准备脚本

准备hello.py,脚本内容如下

echo 'print("hello")' > hello.py

运行脚本

./build-standard/micropython hello.py
hello

Cassiablue API Mock

需要调试Cassiablue API相关的代码,可以参考下面使用网关RESTful API Mock Cassiablue API实现,用于快速测试基本逻辑功能。

备注

  • 仅仅用于本地调试代码基本逻辑,勿用于压力测和稳定性测试

  • 脚本实际运行结果,请以M2000网关内运行为准

  1"""
  2Module name: cassiablue.py  
  3Purpose: Local MicroPython debugging mock. Exposes the same high-level API as the real gateway by forwarding calls to its REST + Server-Sent-Events interface, so you can develop and test your MicroPython logic on a PC without flashing firmware.  
  4MicroPython compatibility: v1.24.1  
  5Important: Do **not** copy this file into the gateway itself—the gateway already contains a native C implementation. This module is **only** for convenient desktop debugging and has no performance guarantees.  
  6Usage: Change the variable `GATEWAY` to the actual IP address of your gateway.
  7"""
  8
  9GATEWAY = ""
 10
 11import json
 12import asyncio
 13
 14from cassia_log import get_logger
 15import aiohttp
 16
 17try:
 18    from typing import Optional
 19except ImportError:
 20    pass
 21
 22
 23log = get_logger("__mock_cassiablue__")
 24
 25_gateway_type = "M2000"
 26_gateway_mac = "00:00:00:00:00:00"
 27_gateway_ver = ""
 28
 29
 30class AsyncQueue:
 31    def __init__(self):
 32        self._buffer = []
 33        self._flag = asyncio.ThreadSafeFlag()
 34
 35    def put_nowait(self, item):
 36        self._buffer.append(item)
 37        self._flag.set()
 38
 39    async def get(self):
 40        while not self._buffer:
 41            await self._flag.wait()
 42        return self._buffer.pop(0)
 43
 44
 45async def send_cmd(url, method="GET", query=None, body=None):
 46    global GATEWAY
 47    url = f"http://{GATEWAY}{url}"
 48
 49    print("send cmd start:", method, url, query, body)
 50
 51    async with aiohttp.ClientSession() as session:
 52        if body is not None:
 53            async with session.request(
 54                method=method, url=url, json=json.loads(body)
 55            ) as resp:
 56                if resp.status == 200:
 57                    text = await resp.text()
 58                    return True, text
 59                else:
 60                    text = await resp.text()
 61                    return False, text
 62        else:
 63            async with session.request(method=method, url=url) as resp:
 64                if resp.status == 200:
 65                    text = await resp.text()
 66                    return True, text
 67                else:
 68                    text = await resp.text()
 69                    return False, text
 70
 71
 72# address is a string like "00:11:22:33:44:55"
 73# params is a json string like '{"param1": "value1", "param2": "value2"}'
 74async def connect(addr, params=None):
 75    if not addr:
 76        raise ValueError("Address cannot be empty")
 77    url = "/gap/nodes/{}/connection".format(addr)
 78    return await send_cmd(url, "POST", None, body=params)
 79
 80
 81# address is a string like "00:11:22:33:44:55"
 82async def disconnect(addr):
 83    if not addr:
 84        raise ValueError("Address cannot be empty")
 85    url = "/gap/nodes/{}/connection".format(addr)
 86    return await send_cmd(url, "DELETE")
 87
 88
 89async def get_connected_devices():
 90    return await send_cmd("/gap/nodes", "GET")
 91
 92
 93async def gatt_discover(addr):
 94    if not addr:
 95        raise ValueError("Address cannot be empty")
 96    url = "/gatt/nodes/{}/services/characteristics/descriptors".format(addr)
 97    return await send_cmd(url, "GET")
 98
 99
100async def gatt_read(addr, handle):
101    if not addr or not handle:
102        raise ValueError("Address and handle cannot be empty")
103    url = "/gatt/nodes/{}/handle/{}/value".format(addr, handle)
104    return await send_cmd(url, "GET")
105
106
107async def gatt_write(addr, handle, value):
108    if not addr or not handle or value is None:
109        raise ValueError("Address, handle, and value cannot be empty")
110    url = "/gatt/nodes/{}/handle/{}/value/{}".format(addr, handle, value)
111    return await send_cmd(url, "GET")
112
113
114class SSEClient:
115    def __init__(
116        self,
117        host: str,
118        path: str,
119        reconnect_delay: int = 3,
120    ):
121        self.log = get_logger(self.__class__.__name__)
122        self.host = host
123        self.path = path
124        self.reconnect_delay = reconnect_delay
125        self.running = True
126        self.queue = AsyncQueue()
127        self.reader = None
128        self.writer = None
129
130    async def connect(self):
131        self.log.info(f"connect to sse start: {self.host}{self.path}")
132        self.reader, self.writer = await asyncio.open_connection(self.host, 80)
133        req = (
134            f"GET {self.path} HTTP/1.1\r\n"
135            f"Accept: text/event-stream\r\n"
136            f"Host: {self.host}\r\n"
137            f"Connection: keep-alive\r\n"
138            "\r\n"
139        )
140        self.writer.write(req.encode())
141        await self.writer.drain()
142        self.log.info(f"connect to sse ok")
143
144    async def co_read(self):
145        while self.running:
146            try:
147                line = await self.reader.readline()
148
149                if not line:
150                    raise OSError("connection closed")
151
152                line = line.decode().strip()
153
154                if not line:
155                    continue
156
157                log.debug("raw line:", line)
158                if not (line[0] in ("{", "[") or line.startswith("data: {")):
159                    continue
160
161                log.debug("raw line:", line)
162                line = line.replace("data: ", "")
163                line = line.replace("\n", "")
164                line = line.replace("\r", "")
165                line = line.replace("\r\n", "")
166
167                try:
168                    data = json.loads(line)
169
170                    if "bdaddrs" in data:
171                        data["bdaddr"] = data["bdaddrs"][0]["bdaddr"]
172                        data["bdaddrType"] = data["bdaddrs"][0]["bdaddrType"]
173                        del data["bdaddrs"]
174
175                    self.queue.put_nowait(data)
176                except Exception as e:
177                    self.log.info("parse data error:", e, line)
178
179            except Exception as e:
180                self.log.error("sse disconnected:", e)
181                await asyncio.sleep(self.reconnect_delay)
182                self.log.info("reconnecting...")
183
184    async def stop(self):
185        self.running = False
186
187
188#######################
189# scan sse
190#######################
191
192scan_sse_client: Optional[SSEClient] = None
193
194
195class BLEScanResult:
196    def __aiter__(self):
197        return self
198
199    async def __anext__(self):
200        global scan_sse_client
201        item = await scan_sse_client.queue.get()
202        return item
203
204
205def scan_result():
206    return BLEScanResult()
207
208
209async def start_scan(query: str = None):
210    global scan_sse_client
211
212    qs = ""
213    if query is None:
214        qs = "event=1"
215    else:
216        if "event=1" not in query:
217            qs = qs + "event=1&"
218        qs = qs + query
219
220    log.info("start scan:", qs)
221
222    scan_sse_client = SSEClient(
223        host=GATEWAY,
224        path=f"/gap/nodes?{qs}",
225    )
226
227    await scan_sse_client.connect()
228
229    asyncio.create_task(scan_sse_client.co_read())
230
231    return True, "OK"
232
233
234#######################
235# notify sse
236#######################
237
238notify_sse_client: Optional[SSEClient] = None
239
240
241class BLENotifyResult:
242    def __aiter__(self):
243        return self
244
245    async def __anext__(self):
246        global notify_sse_client
247        item = await notify_sse_client.queue.get()
248        return item
249
250
251def notify_result():
252    return BLENotifyResult()
253
254
255async def start_recv_notify(query: str = None):
256    global notify_sse_client
257
258    qs = ""
259    if query is None:
260        qs = "event=1"
261    else:
262        if "event=1" not in query:
263            qs = qs + "event=1&"
264        qs = qs + query
265
266    log.info("start notify:", qs)
267
268    notify_sse_client = SSEClient(
269        host=GATEWAY,
270        path=f"/gatt/nodes?{qs}",
271    )
272
273    await notify_sse_client.connect()
274
275    asyncio.create_task(notify_sse_client.co_read())
276
277    return True, "OK"
278
279
280#######################
281# state sse
282#######################
283
284state_sse_client: Optional[SSEClient] = None
285
286
287class BLEConnectionResult:
288    def __aiter__(self):
289        return self
290
291    async def __anext__(self):
292        global state_sse_client
293        item = await state_sse_client.queue.get()
294        return item
295
296
297def connection_result():
298    return BLEConnectionResult()
299
300
301async def start_recv_connection_state():
302    global state_sse_client
303
304    log.info("start state")
305
306    state_sse_client = SSEClient(
307        host=GATEWAY,
308        path=f"/management/nodes/connection-state",
309    )
310
311    await state_sse_client.connect()
312    asyncio.create_task(state_sse_client.co_read())
313
314    return True, "OK"
315
316
317def set_gateway(ip: str):
318    global GATEWAY
319    GATEWAY = ip