本地运行
micropython
支持多平台,可以在本地运行测试APP基础代码逻辑。
使用系统
cat /etc/issue
Ubuntu 22.04 LTS \n \l
uname -a
Linux VM-0-9-ubuntu 5.15.0-106-generic
下载源码
git clone --branch v1.24.1 --depth 1 https://github.com/micropython/micropython.git
git submodule update --init --recursive
安装依赖
sudo apt update
sudo apt install -y build-essential git pkg-config \
libffi-dev libssl-dev libbz2-dev liblzma-dev \
libreadline-dev libsqlite3-dev libgdbm-dev \
libncurses5-dev zlib1g-dev libmpdec-dev \
libmbedtls-dev libdb5.3-dev uuid-dev
编译构建
cd micropython/ports/unix
make clean
make -j$(nproc) MICROPY_PY_USER_CMODULE=0
# LINK build-standard/micropython
# text data bss dec hex filename
# 764607 69328 7088 841023 cd53f build-standard/micropython
查看版本
./build-standard/micropython --version
# MicroPython v1.24.1 on 2025-09-09; linux [GCC 11.2.0] version
准备脚本
准备hello.py,脚本内容如下
echo 'print("hello")' > hello.py
运行脚本
./build-standard/micropython hello.py
hello
Cassiablue API Mock
需要调试Cassiablue API相关的代码,可以参考下面使用网关RESTful API Mock Cassiablue API实现,用于快速测试基本逻辑功能。
备注
仅仅用于本地调试代码基本逻辑,勿用于压力测和稳定性测试
脚本实际运行结果,请以M2000网关内运行为准
1"""
2Module name: cassiablue.py
3Purpose: Local MicroPython debugging mock. Exposes the same high-level API as the real gateway by forwarding calls to its REST + Server-Sent-Events interface, so you can develop and test your MicroPython logic on a PC without flashing firmware.
4MicroPython compatibility: v1.24.1
5Important: Do **not** copy this file into the gateway itself—the gateway already contains a native C implementation. This module is **only** for convenient desktop debugging and has no performance guarantees.
6Usage: Change the variable `GATEWAY` to the actual IP address of your gateway.
7"""
8
9GATEWAY = ""
10
11import json
12import asyncio
13
14from cassia_log import get_logger
15import aiohttp
16
17try:
18 from typing import Optional
19except ImportError:
20 pass
21
22
23log = get_logger("__mock_cassiablue__")
24
25_gateway_type = "M2000"
26_gateway_mac = "00:00:00:00:00:00"
27_gateway_ver = ""
28
29
30class AsyncQueue:
31 def __init__(self):
32 self._buffer = []
33 self._flag = asyncio.ThreadSafeFlag()
34
35 def put_nowait(self, item):
36 self._buffer.append(item)
37 self._flag.set()
38
39 async def get(self):
40 while not self._buffer:
41 await self._flag.wait()
42 return self._buffer.pop(0)
43
44
45async def send_cmd(url, method="GET", query=None, body=None):
46 global GATEWAY
47 url = f"http://{GATEWAY}{url}"
48
49 print("send cmd start:", method, url, query, body)
50
51 async with aiohttp.ClientSession() as session:
52 if body is not None:
53 async with session.request(
54 method=method, url=url, json=json.loads(body)
55 ) as resp:
56 if resp.status == 200:
57 text = await resp.text()
58 return True, text
59 else:
60 text = await resp.text()
61 return False, text
62 else:
63 async with session.request(method=method, url=url) as resp:
64 if resp.status == 200:
65 text = await resp.text()
66 return True, text
67 else:
68 text = await resp.text()
69 return False, text
70
71
72# address is a string like "00:11:22:33:44:55"
73# params is a json string like '{"param1": "value1", "param2": "value2"}'
74async def connect(addr, params=None):
75 if not addr:
76 raise ValueError("Address cannot be empty")
77 url = "/gap/nodes/{}/connection".format(addr)
78 return await send_cmd(url, "POST", None, body=params)
79
80
81# address is a string like "00:11:22:33:44:55"
82async def disconnect(addr):
83 if not addr:
84 raise ValueError("Address cannot be empty")
85 url = "/gap/nodes/{}/connection".format(addr)
86 return await send_cmd(url, "DELETE")
87
88
89async def get_connected_devices():
90 return await send_cmd("/gap/nodes", "GET")
91
92
93async def gatt_discover(addr):
94 if not addr:
95 raise ValueError("Address cannot be empty")
96 url = "/gatt/nodes/{}/services/characteristics/descriptors".format(addr)
97 return await send_cmd(url, "GET")
98
99
100async def gatt_read(addr, handle):
101 if not addr or not handle:
102 raise ValueError("Address and handle cannot be empty")
103 url = "/gatt/nodes/{}/handle/{}/value".format(addr, handle)
104 return await send_cmd(url, "GET")
105
106
107async def gatt_write(addr, handle, value):
108 if not addr or not handle or value is None:
109 raise ValueError("Address, handle, and value cannot be empty")
110 url = "/gatt/nodes/{}/handle/{}/value/{}".format(addr, handle, value)
111 return await send_cmd(url, "GET")
112
113
114class SSEClient:
115 def __init__(
116 self,
117 host: str,
118 path: str,
119 reconnect_delay: int = 3,
120 ):
121 self.log = get_logger(self.__class__.__name__)
122 self.host = host
123 self.path = path
124 self.reconnect_delay = reconnect_delay
125 self.running = True
126 self.queue = AsyncQueue()
127 self.reader = None
128 self.writer = None
129
130 async def connect(self):
131 self.log.info(f"connect to sse start: {self.host}{self.path}")
132 self.reader, self.writer = await asyncio.open_connection(self.host, 80)
133 req = (
134 f"GET {self.path} HTTP/1.1\r\n"
135 f"Accept: text/event-stream\r\n"
136 f"Host: {self.host}\r\n"
137 f"Connection: keep-alive\r\n"
138 "\r\n"
139 )
140 self.writer.write(req.encode())
141 await self.writer.drain()
142 self.log.info(f"connect to sse ok")
143
144 async def co_read(self):
145 while self.running:
146 try:
147 line = await self.reader.readline()
148
149 if not line:
150 raise OSError("connection closed")
151
152 line = line.decode().strip()
153
154 if not line:
155 continue
156
157 log.debug("raw line:", line)
158 if not (line[0] in ("{", "[") or line.startswith("data: {")):
159 continue
160
161 log.debug("raw line:", line)
162 line = line.replace("data: ", "")
163 line = line.replace("\n", "")
164 line = line.replace("\r", "")
165 line = line.replace("\r\n", "")
166
167 try:
168 data = json.loads(line)
169
170 if "bdaddrs" in data:
171 data["bdaddr"] = data["bdaddrs"][0]["bdaddr"]
172 data["bdaddrType"] = data["bdaddrs"][0]["bdaddrType"]
173 del data["bdaddrs"]
174
175 self.queue.put_nowait(data)
176 except Exception as e:
177 self.log.info("parse data error:", e, line)
178
179 except Exception as e:
180 self.log.error("sse disconnected:", e)
181 await asyncio.sleep(self.reconnect_delay)
182 self.log.info("reconnecting...")
183
184 async def stop(self):
185 self.running = False
186
187
188#######################
189# scan sse
190#######################
191
192scan_sse_client: Optional[SSEClient] = None
193
194
195class BLEScanResult:
196 def __aiter__(self):
197 return self
198
199 async def __anext__(self):
200 global scan_sse_client
201 item = await scan_sse_client.queue.get()
202 return item
203
204
205def scan_result():
206 return BLEScanResult()
207
208
209async def start_scan(query: str = None):
210 global scan_sse_client
211
212 qs = ""
213 if query is None:
214 qs = "event=1"
215 else:
216 if "event=1" not in query:
217 qs = qs + "event=1&"
218 qs = qs + query
219
220 log.info("start scan:", qs)
221
222 scan_sse_client = SSEClient(
223 host=GATEWAY,
224 path=f"/gap/nodes?{qs}",
225 )
226
227 await scan_sse_client.connect()
228
229 asyncio.create_task(scan_sse_client.co_read())
230
231 return True, "OK"
232
233
234#######################
235# notify sse
236#######################
237
238notify_sse_client: Optional[SSEClient] = None
239
240
241class BLENotifyResult:
242 def __aiter__(self):
243 return self
244
245 async def __anext__(self):
246 global notify_sse_client
247 item = await notify_sse_client.queue.get()
248 return item
249
250
251def notify_result():
252 return BLENotifyResult()
253
254
255async def start_recv_notify(query: str = None):
256 global notify_sse_client
257
258 qs = ""
259 if query is None:
260 qs = "event=1"
261 else:
262 if "event=1" not in query:
263 qs = qs + "event=1&"
264 qs = qs + query
265
266 log.info("start notify:", qs)
267
268 notify_sse_client = SSEClient(
269 host=GATEWAY,
270 path=f"/gatt/nodes?{qs}",
271 )
272
273 await notify_sse_client.connect()
274
275 asyncio.create_task(notify_sse_client.co_read())
276
277 return True, "OK"
278
279
280#######################
281# state sse
282#######################
283
284state_sse_client: Optional[SSEClient] = None
285
286
287class BLEConnectionResult:
288 def __aiter__(self):
289 return self
290
291 async def __anext__(self):
292 global state_sse_client
293 item = await state_sse_client.queue.get()
294 return item
295
296
297def connection_result():
298 return BLEConnectionResult()
299
300
301async def start_recv_connection_state():
302 global state_sse_client
303
304 log.info("start state")
305
306 state_sse_client = SSEClient(
307 host=GATEWAY,
308 path=f"/management/nodes/connection-state",
309 )
310
311 await state_sse_client.connect()
312 asyncio.create_task(state_sse_client.co_read())
313
314 return True, "OK"
315
316
317def set_gateway(ip: str):
318 global GATEWAY
319 GATEWAY = ip