Run Locally
micropython
supports multiple platforms and can be used to run and test the basic APP logic locally.
System Information
cat /etc/issue
Ubuntu 22.04 LTS \n \l
uname -a
Linux VM-0-9-ubuntu 5.15.0-106-generic
Download Source Code
git clone --branch v1.24.1 --depth 1 https://github.com/micropython/micropython.git
git submodule update --init --recursive
Install Dependencies
sudo apt update
sudo apt install -y build-essential git pkg-config \
libffi-dev libssl-dev libbz2-dev liblzma-dev \
libreadline-dev libsqlite3-dev libgdbm-dev \
libncurses5-dev zlib1g-dev libmpdec-dev \
libmbedtls-dev libdb5.3-dev uuid-dev
Compile and Build
cd micropython/ports/unix
make clean
make -j$(nproc) MICROPY_PY_USER_CMODULE=0
# LINK build-standard/micropython
# text data bss dec hex filename
# 764607 69328 7088 841023 cd53f build-standard/micropython
Check Version
./build-standard/micropython --version
# MicroPython v1.24.1 on 2025-09-09; linux [GCC 11.2.0] version
Prepare Script
Prepare hello.py with the following script content:
echo 'print("hello")' > hello.py
Run the Script
./build-standard/micropython hello.py
hello
Cassiablue API Mock
To debug code related to the Cassiablue API, you can refer to the following implementation using the gateway RESTful API to mock the Cassiablue API, which allows for quick testing of basic logic and functionality.
Note
For local debugging of basic code logic only; not intended for stress or stability testing.
The actual script execution results should be based on operation within the M2000 gateway.
1"""
2Module name: cassiablue.py
3Purpose: Local MicroPython debugging mock. Exposes the same high-level API as the real gateway by forwarding calls to its REST + Server-Sent-Events interface, so you can develop and test your MicroPython logic on a PC without flashing firmware.
4MicroPython compatibility: v1.24.1
5Important: Do **not** copy this file into the gateway itself—the gateway already contains a native C implementation. This module is **only** for convenient desktop debugging and has no performance guarantees.
6Usage: Change the variable `GATEWAY` to the actual IP address of your gateway.
7"""
8
9GATEWAY = ""
10
11import json
12import asyncio
13
14from cassia_log import get_logger
15import aiohttp
16
17try:
18 from typing import Optional
19except ImportError:
20 pass
21
22
23log = get_logger("__mock_cassiablue__")
24
25_gateway_type = "M2000"
26_gateway_mac = "00:00:00:00:00:00"
27_gateway_ver = ""
28
29
30class AsyncQueue:
31 def __init__(self):
32 self._buffer = []
33 self._flag = asyncio.ThreadSafeFlag()
34
35 def put_nowait(self, item):
36 self._buffer.append(item)
37 self._flag.set()
38
39 async def get(self):
40 while not self._buffer:
41 await self._flag.wait()
42 return self._buffer.pop(0)
43
44
45async def send_cmd(url, method="GET", query=None, body=None):
46 global GATEWAY
47 url = f"http://{GATEWAY}{url}"
48
49 print("send cmd start:", method, url, query, body)
50
51 async with aiohttp.ClientSession() as session:
52 if body is not None:
53 async with session.request(
54 method=method, url=url, json=json.loads(body)
55 ) as resp:
56 if resp.status == 200:
57 text = await resp.text()
58 return True, text
59 else:
60 text = await resp.text()
61 return False, text
62 else:
63 async with session.request(method=method, url=url) as resp:
64 if resp.status == 200:
65 text = await resp.text()
66 return True, text
67 else:
68 text = await resp.text()
69 return False, text
70
71
72# address is a string like "00:11:22:33:44:55"
73# params is a json string like '{"param1": "value1", "param2": "value2"}'
74async def connect(addr, params=None):
75 if not addr:
76 raise ValueError("Address cannot be empty")
77 url = "/gap/nodes/{}/connection".format(addr)
78 return await send_cmd(url, "POST", None, body=params)
79
80
81# address is a string like "00:11:22:33:44:55"
82async def disconnect(addr):
83 if not addr:
84 raise ValueError("Address cannot be empty")
85 url = "/gap/nodes/{}/connection".format(addr)
86 return await send_cmd(url, "DELETE")
87
88
89async def get_connected_devices():
90 return await send_cmd("/gap/nodes", "GET")
91
92
93async def gatt_discover(addr):
94 if not addr:
95 raise ValueError("Address cannot be empty")
96 url = "/gatt/nodes/{}/services/characteristics/descriptors".format(addr)
97 return await send_cmd(url, "GET")
98
99
100async def gatt_read(addr, handle):
101 if not addr or not handle:
102 raise ValueError("Address and handle cannot be empty")
103 url = "/gatt/nodes/{}/handle/{}/value".format(addr, handle)
104 return await send_cmd(url, "GET")
105
106
107async def gatt_write(addr, handle, value):
108 if not addr or not handle or value is None:
109 raise ValueError("Address, handle, and value cannot be empty")
110 url = "/gatt/nodes/{}/handle/{}/value/{}".format(addr, handle, value)
111 return await send_cmd(url, "GET")
112
113
114class SSEClient:
115 def __init__(
116 self,
117 host: str,
118 path: str,
119 reconnect_delay: int = 3,
120 ):
121 self.log = get_logger(self.__class__.__name__)
122 self.host = host
123 self.path = path
124 self.reconnect_delay = reconnect_delay
125 self.running = True
126 self.queue = AsyncQueue()
127 self.reader = None
128 self.writer = None
129
130 async def connect(self):
131 self.log.info(f"connect to sse start: {self.host}{self.path}")
132 self.reader, self.writer = await asyncio.open_connection(self.host, 80)
133 req = (
134 f"GET {self.path} HTTP/1.1\r\n"
135 f"Accept: text/event-stream\r\n"
136 f"Host: {self.host}\r\n"
137 f"Connection: keep-alive\r\n"
138 "\r\n"
139 )
140 self.writer.write(req.encode())
141 await self.writer.drain()
142 self.log.info(f"connect to sse ok")
143
144 async def co_read(self):
145 while self.running:
146 try:
147 line = await self.reader.readline()
148
149 if not line:
150 raise OSError("connection closed")
151
152 line = line.decode().strip()
153
154 if not line:
155 continue
156
157 log.debug("raw line:", line)
158 if not (line[0] in ("{", "[") or line.startswith("data: {")):
159 continue
160
161 log.debug("raw line:", line)
162 line = line.replace("data: ", "")
163 line = line.replace("\n", "")
164 line = line.replace("\r", "")
165 line = line.replace("\r\n", "")
166
167 try:
168 data = json.loads(line)
169
170 if "bdaddrs" in data:
171 data["bdaddr"] = data["bdaddrs"][0]["bdaddr"]
172 data["bdaddrType"] = data["bdaddrs"][0]["bdaddrType"]
173 del data["bdaddrs"]
174
175 self.queue.put_nowait(data)
176 except Exception as e:
177 self.log.info("parse data error:", e, line)
178
179 except Exception as e:
180 self.log.error("sse disconnected:", e)
181 await asyncio.sleep(self.reconnect_delay)
182 self.log.info("reconnecting...")
183
184 async def stop(self):
185 self.running = False
186
187
188#######################
189# scan sse
190#######################
191
192scan_sse_client: Optional[SSEClient] = None
193
194
195class BLEScanResult:
196 def __aiter__(self):
197 return self
198
199 async def __anext__(self):
200 global scan_sse_client
201 item = await scan_sse_client.queue.get()
202 return item
203
204
205def scan_result():
206 return BLEScanResult()
207
208
209async def start_scan(query: str = None):
210 global scan_sse_client
211
212 qs = ""
213 if query is None:
214 qs = "event=1"
215 else:
216 if "event=1" not in query:
217 qs = qs + "event=1&"
218 qs = qs + query
219
220 log.info("start scan:", qs)
221
222 scan_sse_client = SSEClient(
223 host=GATEWAY,
224 path=f"/gap/nodes?{qs}",
225 )
226
227 await scan_sse_client.connect()
228
229 asyncio.create_task(scan_sse_client.co_read())
230
231 return True, "OK"
232
233
234#######################
235# notify sse
236#######################
237
238notify_sse_client: Optional[SSEClient] = None
239
240
241class BLENotifyResult:
242 def __aiter__(self):
243 return self
244
245 async def __anext__(self):
246 global notify_sse_client
247 item = await notify_sse_client.queue.get()
248 return item
249
250
251def notify_result():
252 return BLENotifyResult()
253
254
255async def start_recv_notify(query: str = None):
256 global notify_sse_client
257
258 qs = ""
259 if query is None:
260 qs = "event=1"
261 else:
262 if "event=1" not in query:
263 qs = qs + "event=1&"
264 qs = qs + query
265
266 log.info("start notify:", qs)
267
268 notify_sse_client = SSEClient(
269 host=GATEWAY,
270 path=f"/gatt/nodes?{qs}",
271 )
272
273 await notify_sse_client.connect()
274
275 asyncio.create_task(notify_sse_client.co_read())
276
277 return True, "OK"
278
279
280#######################
281# state sse
282#######################
283
284state_sse_client: Optional[SSEClient] = None
285
286
287class BLEConnectionResult:
288 def __aiter__(self):
289 return self
290
291 async def __anext__(self):
292 global state_sse_client
293 item = await state_sse_client.queue.get()
294 return item
295
296
297def connection_result():
298 return BLEConnectionResult()
299
300
301async def start_recv_connection_state():
302 global state_sse_client
303
304 log.info("start state")
305
306 state_sse_client = SSEClient(
307 host=GATEWAY,
308 path=f"/management/nodes/connection-state",
309 )
310
311 await state_sse_client.connect()
312 asyncio.create_task(state_sse_client.co_read())
313
314 return True, "OK"
315
316
317def set_gateway(ip: str):
318 global GATEWAY
319 GATEWAY = ip