背景描述:我想写一个类似与下文的简单的逻辑
select {
case msg1 := <-messageChannel:
fmt.Println(msg1)
case msg2 := <-timerChannel:
fmt.Println(msg2)
}
这是一个很简单的select,分别对两种情况:消息抵达和timer到时间进行了不同的处理
然后当我使用python时,逻辑代码如下
from asyncio import Queue, Lock, create_task
tasks = [queue.get(), time_sleep]
done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
first_result = done.pop().result()
这样,我可以得到第一个完成的任务从而进行并发等待
然而,出现了一个异常诡异的事情:当time_sleep先完成时,下一个来自queue的消息必定丢失
经过一系列测试与debug,我发现这个queue的消息莫名其妙直接丢失了,没有执行到first_result = done.pop().result()这一行
这就更奇怪了
突然,我想到了go的channel中有一个等待队列,那么想必python这边也是有类似的实现,而我并没有把这个queue.get()所占用的位置给取消
但是我使用了"_"来显式忽略这一个任务,而我每次循环都会创新一个新的queue.get()
那么真相只有一个:队列中消失的消息是被丢进垃圾堆里的queue.get()抢走了
那么有什么办法避免这条消息被抢走吗
我最初想到的是采用task.cancel()来取消这个get任务,应该能解决消息被抢走的问题,但是如果cancel的时机非常精准,或者timer和queue.get同时就绪,那么这很容易导致消息丢失
那么第二套方案就是再额外封装一层,当任务没被触发时放进tasks进行第二轮等待
所以最终的逻辑代码如下:
from asyncio import Queue, Lock, create_task
time_sleep = asyncio.create_task(asyncio.sleep(interval))
queue_get = asyncio.create_task(queue.get())
tasks = [queue.get(), time_sleep]
done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
first_result = done.pop().result()
if first_result ==队列元素:
queue_get = asyncio.create_task(queue.get())
else:
time_sleep = asyncio.create_task(asyncio.sleep(interval))
看上去很简单的方案,实际上陪gpt和代码折腾了两个小时才找到
结论:在python中实现并发等待逻辑时,最好对每个协程进行额外封装并使用变量管理,非必要不创建新协程
queue.get()里面存在等待队列且不会因为被使用_而取消占用,需要手动取消