在这种设置下,确实无法实现在process_sentence
函数等待asyncio.sleep()
时继续从流中获取下一个字符。因为await
关键字实际上会阻塞当前运行的任务或协程。
在以下代码片段中:
async def main():
i = 0
async for sentence in sentences_generator():
print("processing sentence: ", i)
await process_sentence(sentence)
i += 1
当你看到"获取字符:"
字符串输出时,是在第一行async for
执行的时候,但是在此之后,await process_sentence(sentence)
会阻塞main()
函数的执行,导致在处理句子时不能同时获取新的字符。
替代方案是使用队列(Queue)。生产字符的任务(producer)可以将字符放入队列中,而使用字符的任务(consumer)可以从队列中取出项。这样即使消费者还在等待处理字符的过程中,生产者也可以持续向队列填充字符。它们各自独立工作。
以下是使用队列改进后的代码示例:
import asyncio
async def stream():
char_string = "Hi. Hello. Thank you."
for char in char_string:
await asyncio.sleep(0.1)
print("got:", char)
yield char
async def sentences_generator(q: asyncio.Queue[str], flag: asyncio.Event):
sentence = ""
async for char in stream():
sentence += char
if char in [".", "!", "?"]:
print("got sentence : ", sentence)
await q.put(sentence)
sentence = ""
flag.set()
async def process_sentence(q: asyncio.Queue[str], flag: asyncio.Event):
global i
while not (q.empty() and flag.is_set()):
item = await q.get()
print("processing sentence: ", i)
print("waiting for processing sentence: ", item)
await asyncio.sleep(len(item) * 0.1)
print("sentence processed!")
i += 1
async def main():
global i
i = 1
event = asyncio.Event()
queue = asyncio.Queue[str]()
producer_task = asyncio.create_task(sentences_generator(queue, event))
consumer_task = asyncio.create_task(process_sentence(queue, event))
await asyncio.gather(producer_task, consumer_task)
asyncio.run(main())