使用 ctypes 对接 C++的 dll 接口,封装为 websocket 接口方便网络调用。使用了 Redis 的 PUB/SUB 来监听服务端主动发起的消息通知。
使用了 asyncio 库做异步任务来处理 websocket 的请求通知,使用 aioredis 来进行管理 Redis 连接。
入口代码如下:
async def main():
redis = aioredis.from_url(f'redis://:{redis_password}@{redis_host}:{redis_port}', encoding='utf-8')
redis_task = asyncio.create_task(redis_subscriber_start_websocket(redis))
server_task = websockets.serve(msdk_handler, "0.0.0.0", 8765)
asyncio.create_task(monitor_loop())
print("WebSocket 服务器启动在 ws://localhost:8765:redis 地址: ", redis_host, redis_port)
await asyncio.gather(server_task, redis_task)
if __name__ == "__main__":
asyncio.run(main())
大致的 websocket 处理流程如下:
try:
async for message in websocket:
data = json.loads(message)
# print(f"收到 websocket 消息: {data}")
await messageHandler(data, connected[random_id_str])
print("WebSocket 连接正常关闭") # 添加这行来确认循环是否正常结束
await clearnWebscoket(random_id_str)
async def messageHandler(data, connected):
if data['action'] == 'something':
# 切换位置
results = await change_something(connected['client_id'], data)
await connected['websocket'].send(json.dumps(results))
async def change_something(client_id):
future = asyncio.Future()
futures_change_chara_pos[client_id] = future
# 调用 DLL 中的更改角色位置函数
sdk.SDK_change_something(c_char_p(client_id.encode('utf-8')),callback_instance)
while not future.done():
await asyncio.sleep(0)
result = await future
return result
在 callback_instance 回调中,我会设置 future 的内容。
def set_futures_status(client_id, futures_obj, data):
if client_id in futures_obj:
future = futures_obj.pop(client_id)
if not future.done():
future.set_result(data)
对项目的异步和 Redis 消息有点不理解。
1 、有一个播放音频的 C++接口,会频繁触发回调函数,我发现在播放音频的时候,接收了其他的 websocket 消息,发现好像没有处理,处理函数的日志没有打印出来 [偶现正常] 。不知道是不是我异步有问题,阻塞了主线程导致的。(作为一个前端开发,python 的异步好难懂,大佬们有推荐的书籍吗)
2 、Redis 消息一会后,就不会接收到订阅的消息。 消息的发布和接收频道一致,并且重启 python 后服务正常。
能帮忙解决一下目前碰到的 2 个问题,特别是异步的代码,我不知道自己写的是否正确。
while not future.done():
await asyncio.sleep(0)
result = await future
上面的代码就是我发现 await future 后会一直阻塞主进程代码运行。所以在 AI 的提示下,加上了代码后正常运行。但是 websocket 消息处理中的 await 又不会阻塞整个主进程运行,就很疑惑。
目前预算 500 ,可谈。代码地址: https://github.com/mydaoyuan/pythonAndDll
联系方式:d29zaGl0ZHkxMjM0NTY=
1
zombiecong 123 天前 1
python 的异步并不是多线程和多进程,需要并发还要用 ThreadPoolExecutor 或者 ProcessPoolExecutor
https://docs.python.org/3/library/concurrent.futures.html |
2
mightybruce 123 天前 1
调用 C++的 dll 接口 是会阻塞的,
asyncio 是协程,线程阻塞了, 协程肯定是会阻塞的, sdk 相关的代码单独测试吧,可以搞一个并发多进程队列, 发数据扔给队列就继续处理接受数据,sdk 相关的代码不断从队列里面取数据。 |
3
pursuer 123 天前 1
如果在使用 asyncio 的过程中使用了多线程回调时需要注意,asyncio 中很多 API 并不是线程安全的,不能跨线程调用
比如 set_futures_status 这个函数就应该通过如下模式调用 asyncio.run_coroutine_threadsafe(set_futures_status(a1,a2,a3),eventloop) |
4
pursuer 123 天前 1
顺便一提,此时 set_futures_status 应该改成 async def
eventloop 是你创建 future 的线程的 eventloop ,可以通过 get_event_loop()获取 这样可以解决无法用 future 唤醒导致的阻塞的问题。 while not future.done(): --await asyncio.sleep(0) result = await future 这种写法就是个坑,相当于轮循,完全没发挥协程的优势,不建议使用这种写法 |
6
fds 123 天前 1
python 感觉还是写阻塞的代码比较流畅,异步是后面塞进来的,得对底层多一些了解。阻塞的逻辑得单独放在个线程里处理。要异步不能直接用 nodejs 吗?虽然 nodejs 写不好也可能阻塞,但毕竟设计之初就是异步模式,大部分常用 IO 也都包装好了。前端上手 js 也熟练些。
|
8
Tdy95 OP @fds node 对接 dll 和 so 的能力还是太弱了,python 这块方便很多。Python 之前么咋用过,全靠 AI 磕磕碰碰写出来的代码。结果被异步搞自闭了 T T
|
9
fds 123 天前
@Tdy95 哦 这样呀。其实就像前面一些回复说的把 dll 调用扔到个线程就行。我也不太熟悉,问了下 gpt ,给出的代码是
import asyncio from concurrent.futures import ThreadPoolExecutor executor = ThreadPoolExecutor(max_workers=4) def dll_call(client_id): # 同步调用 DLL 函数 sdk.SDK_change_something(c_char_p(client_id.encode('utf-8')), callback_instance) # 假设这里返回结果 return "result from dll" async def change_something(client_id): loop = asyncio.get_running_loop() # 在后台线程中执行 DLL 调用,避免阻塞事件循环 result = await loop.run_in_executor(executor, dll_call, client_id) return result 确实就跟前面几楼的回复一样呢,也算是挺清晰的。 |
10
sujin190 123 天前
这还不简单,asyncio 的运行线程和 dll 运行线程不能是同一个啊,否则一个线程同一个时刻只能干一件事情,asyncio await 检测到有 io 阻塞会切换 python 栈帧,但是如果你是重 cpu 一直在计算 await 也会阻塞的
|
11
LinePro 123 天前
问一下,印象中 Future 是可以直接 await 等待结果的。那 while not future.done() 这种轮询式的写法意义何在?
|
12
Tdy95 OP @LinePro 我是发现在调用 dll 后,5s 后返回结果。 但是再等 5s 后才会执行 await 后的代码。我是让 AI 给出的 while not future.done() 代码
|
14
Tdy95 OP @pursuer 我已经修改了,发现 future 一直没变化。
``` def callback_speak_by_audio(code, status, frame_id, client_id): client_id = client_id.decode('utf-8') # 解码客户端 ID status = json.loads(status.decode('utf-8')) # 假设 status 是 UTF-8 编码的字符串 print(f"callback_speak_by_audio=====run, {client_id in futures_speak_by_audio_stream}") if status["data"]["FrameId"] == -1: if code == MSDKStatus.MSDK_SUCCESS_SPEAK_BY_AUDIO_FINISH.value: print(f"语音说话: {code}, 客户端 ID: {client_id}") asyncio.run_coroutine_threadsafe(set_futures_status_async(client_id, futures_speak_by_audio_stream, {"code": 204, "status": status, "name": "speak_by_audio", "success": True, "client_id": client_id}), websocketAll[client_id]['main_loop']) async def set_futures_status_async(client_id, futures_obj, data): if client_id in futures_obj: future = futures_obj.pop(client_id) if not future.done(): future.set_result(data) ``` 外部调用形式为: ``` if connected["audio_future"] is None: feature = asyncio.Future() connected["audio_future"] = feature print("Creating sendAudioEndData task") asyncio.create_task(sendAudioEndData(connected, feature)) async def sendAudioEndData(connected, feature): print("发送音频结束数据") try: result = await feature # 等待 feature 完成 print(f"发送音频结束数据: {result}") await connected['websocket'].send(json.dumps(result)) except Exception as e: print(f"发送数据时发生错误: {e}") finally: connected["audio_future"] = None # 确保音频 future 被正确重置 ``` 麻烦大佬再指点一下,是哪里不对呢?从日志结果看,“语音说话:”的日志已经执行。 但是后续的 sendAudioEndData 的 print(f"发送音频结束数据: {result}")没有执行。 |
15
Tdy95 OP |
16
pursuer 120 天前
@Tdy95 不知道你的 websocketAll[client_id]['main_loop']是哪里来的,future 正常唤醒是要求你的 future 创建和 set_result 在同一个线程/event_loop 中执行,要看着两个地方。
|
17
Tdy95 OP @pursuer 我是直接在 message_handle.py 文件创建了一个全局变量。main_loop = asyncio.get_event_loop()
|