新聞中心
先前有公眾號(hào)朋友問(wèn)起一個(gè)問(wèn)題,大概的問(wèn)題是這樣:

在石樓等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都做網(wǎng)站、網(wǎng)站建設(shè) 網(wǎng)站設(shè)計(jì)制作專業(yè)公司,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),全網(wǎng)營(yíng)銷推廣,外貿(mào)網(wǎng)站制作,石樓網(wǎng)站建設(shè)費(fèi)用合理。
在異步接口里面接收批量上傳的文件夾后通過(guò)webdav3進(jìn)行批量進(jìn)行文件處理。其實(shí)涉及的問(wèn)題就是:相對(duì)于在異步之中進(jìn)行線程化的異步處理。
大致的代碼如下所示:
@uploadrp.post('/upload/rp')
async def upload_rp_file(file: List[UploadFile] = File(...)):
........
upload_result = await new_upload_file(file_path=file_path, file_name=file_name, old_name=rp_dir_name)
.........其中對(duì)應(yīng)的new_upload_file也是異步函數(shù),但是里面涉及到webdav3庫(kù)的異步的調(diào)用,如下代碼所示:
async def new_upload_file(file_path, file_name, old_name):
""" 上傳文件夾 """
.........
upload_rp_file_result = client.upload_async('' + file_name, file_path,
callback=partial(completion_callback, old_file_dir=old_name, file_name=file_name))
.........容納后在調(diào)用upload_async的使用,需要進(jìn)行任務(wù)處理完成的回調(diào),也就是completion_callback函數(shù),它的代碼如下:
def completion_callback(old_file_dir, file_name):
pss它的問(wèn)題就是在completion_callback還有一個(gè)異步的函數(shù)需要調(diào)用的時(shí)候就遇到問(wèn)題了:
(1) 我無(wú)法在upload_async上傳文件完成后的回調(diào)函數(shù)使用async,即 async def completion_callback
(2) 我在completion_callback中獲取當(dāng)前事件循環(huán),因?yàn)閳?zhí)行這個(gè)回調(diào)函數(shù)的時(shí)候,控制臺(tái)打印當(dāng)前沒(méi)有事件循環(huán)
3) 我在completion_callback中new一個(gè)事件循環(huán)(new_event_loop)然后用create_task或run_until_complete執(zhí)行update_rp_file_status函數(shù)的時(shí)候,都不成功:
- 要不然就報(bào)update_rp_file_status函數(shù)跟new的事件循環(huán)不是同一個(gè)...
- 要不就是這個(gè)update_rp_file_status壓根不執(zhí)行...
- 要不就是還沒(méi)執(zhí)行,new出來(lái)的事件循環(huán),在update_rp_file_status還沒(méi)執(zhí)行的時(shí)候就被銷毀了,很多問(wèn)題...
問(wèn)題探究
其實(shí)這個(gè)問(wèn)題本質(zhì)其實(shí)就是關(guān)于 client.upload_async中啟用了新的線程去異步處理了,導(dǎo)致了開(kāi)啟了新的線程的問(wèn)題,如下 client.upload_async的代碼:
由此可見(jiàn),問(wèn)題肯本原因在于是因?yàn)槲覀兪窃谛戮€程中調(diào)用了異步函數(shù),而新線程沒(méi)有自己的事件循環(huán)導(dǎo)致的。
回到事件循環(huán)的本身上,我們知道首先需要知道一點(diǎn):
異步函數(shù)需要在事件循環(huán)中運(yùn)行。但是由于我們啟動(dòng)服務(wù)之后,在主線程中,F(xiàn)astAPI應(yīng)用程序已經(jīng)創(chuàng)建了一個(gè)事件循環(huán),并且通過(guò)uvicorn.run()方法來(lái)運(yùn)行該應(yīng)用程序,而這個(gè)的事件循環(huán)對(duì)象是當(dāng)前主線程創(chuàng)建出來(lái)的, 而當(dāng)我們?cè)谛戮€程中,也就是client.upload_async調(diào)用之后,開(kāi)啟的線程中并沒(méi)有默認(rèn)的事件循環(huán)可用。因此,當(dāng)你嘗試在新線程中調(diào)用異步函數(shù)時(shí),就會(huì)拋出RuntimeError: There is no current event loop in thread異常,甚至你嘗試打印獲取當(dāng)前運(yùn)行的事件循環(huán)都無(wú)法獲取到。如下代碼所示:
from fastapi import FastAPI
import asyncio
app = FastAPI()
from threading import Thread # 創(chuàng)建線程的模塊
def task(name):
print(asyncio.get_event_loop())
async def do_task():
# 異步任務(wù)的邏輯
await asyncio.sleep(1)
print("異步任務(wù)完成")
@app.get("/items")
async def read_item():
p = Thread(target=task, args=('線程1',))
p.start()
return {"data": "ok"}在上面的等待嗎中我們直接輸出當(dāng)前 print(asyncio.get_event_loop())也會(huì)遇到如下類似的錯(cuò)誤提示:
問(wèn)題解決
解決此類的問(wèn)題,開(kāi)始的時(shí)候,因?yàn)榫唧w可能沒(méi)了解帶業(yè)務(wù)細(xì)節(jié)和邏輯,所以給的思路也有點(diǎn)偏差,后來(lái)自己嘗試后,其實(shí)才發(fā)現(xiàn)原來(lái)只是一個(gè)比較簡(jiǎn)單的問(wèn)題,此類的問(wèn)題,我們可以在新線程中創(chuàng)建一個(gè)新的事件循環(huán),并將任務(wù)添加到該事件循環(huán)中,以確保異步函數(shù)能夠正確運(yùn)行。這樣,每個(gè)線程都有自己的事件循環(huán),可以獨(dú)立地運(yùn)行異步任務(wù)。也就是如下的代碼:
from fastapi import FastAPI
import asyncio
app = FastAPI()
from threading import Thread # 創(chuàng)建線程的模塊
def task(name):
print(asyncio.get_event_loop())
# 嘗試去執(zhí)行異步任務(wù)
asyncio.run(do_task())
async def do_task():
# 異步任務(wù)的邏輯
await asyncio.sleep(1)
print("異步任務(wù)完成")
@app.get("/items")
async def read_item():
p = Thread(target=task, args=('線程1',))
p.start()
return {"data": "ok"}在上面的代碼中使用 asyncio.run開(kāi)啟新的事件循環(huán)去處理異步函數(shù)即可。但是之前那個(gè)朋友也說(shuō)它嘗試夠使用在completion_callback中new一個(gè)事件循環(huán)(new_event_loop)然后用create_task或run_until_complete執(zhí)行update_rp_file_status函數(shù)的時(shí)候,都不成功,或許可能得原因是它創(chuàng)建新的事件循環(huán)之后,沒(méi)有進(jìn)行重新設(shè)置,所以會(huì)導(dǎo)致無(wú)法設(shè)置成功,也就是我們也可以改為如下代碼進(jìn)行運(yùn)行:
def task(name):
loop = asyncio.new_event_loop() # 創(chuàng)建新的事件循環(huán)
asyncio.set_event_loop(loop) # 設(shè)置新的事件循環(huán)為當(dāng)前線程的事件循環(huán)
loop.run_until_complete(do_task()) # 在新的事件循環(huán)中運(yùn)行異步任務(wù)關(guān)于asyncio.run 和手動(dòng)的設(shè)置創(chuàng)建事件循環(huán)
asyncio.run()是從Python 3.7版本引入的一個(gè)方便的函數(shù),它用于運(yùn)行異步函數(shù)。它自身會(huì)自動(dòng)創(chuàng)建一個(gè)新的事件循環(huán),并將異步函數(shù)添加到該事件循環(huán)中運(yùn)行。在異步函數(shù)完成后,它會(huì)關(guān)閉事件循環(huán)并返回結(jié)果。而我們的
loop = asyncio.new_event_loop() # 創(chuàng)建新的事件循環(huán)
asyncio.set_event_loop(loop) # 設(shè)置新的事件循環(huán)為當(dāng)前線程的事件循環(huán)
loop.run_until_complete(do_task()) # 在新的事件循環(huán)中運(yùn)行異步是一種手動(dòng)創(chuàng)建、設(shè)置和運(yùn)行事件循環(huán)的方法包括:
使用asyncio.new_event_loop()創(chuàng)建一個(gè)新的事件循環(huán)。使用asyncio.set_event_loop(loop)將新創(chuàng)建的事件循環(huán)設(shè)置為當(dāng)前線程的事件循環(huán)。使用loop.run_until_complete(do_task())在新的事件循環(huán)中運(yùn)行異步任務(wù),直到任務(wù)完成。
他們之間的區(qū)別在于使用asyncio.run()時(shí),它會(huì)自動(dòng)處理事件循環(huán)的創(chuàng)建、設(shè)置和關(guān)閉,非常方便。而手動(dòng)創(chuàng)建、設(shè)置和運(yùn)行事件循環(huán)需要更多的代碼來(lái)處理這些步驟,但也提供了更多的靈活性和控制權(quán)。
當(dāng)前標(biāo)題:關(guān)于FastAPI中在新線程里調(diào)用協(xié)成函數(shù)問(wèn)題
轉(zhuǎn)載來(lái)于:http://www.5511xx.com/article/cospihc.html


咨詢
建站咨詢
