運籌帷幄決勝千里,Python3.10原生協程asyncio工業級真實協程非同步消費任務排程實踐
我們一直都相信這樣一種說法:協程是比多執行緒更高效的一種併發工作方式,它完全由程式本身所控制,也就是在使用者態執行,協程避免了像執行緒切換那樣產生的上下文切換,在效能方面得到了很大的提升。毫無疑問,這是顛撲不破的業界共識,是放之四海而皆準的真理。
但事實上,協程遠比大多數人想象中的複雜,正因為協程的“使用者態”特性,任務排程權掌握在撰寫協程任務的人手裡,而僅僅依賴async和await關鍵字遠遠達不到“排程”的級別,有時候反而會拖累任務效率,使其在任務執行效率上還不及“系統態”的多執行緒和多程序,本次我們來探討一下Python3原生協程任務的排程管理。
Python3.10協程庫async.io的基本操作
事件迴圈(Eventloop)是 原生協程庫asyncio 的核心,可以理解為總指揮。Eventloop例項提供了註冊、取消和執行任務和回撥的方法。
Eventloop可以將一些非同步方法繫結到事件迴圈上,事件迴圈會迴圈執行這些方法,但是和多執行緒一樣,同時只能執行一個方法,因為協程也是單執行緒執行。當執行到某個方法時,如果它遇到了阻塞,事件迴圈會暫停它的執行去執行其他的方法,與此同時為這個方法註冊一個回撥事件,當某個方法從阻塞中恢復,下次輪詢到它的時候將會繼續執行,亦或者,當沒有輪詢到它,它提前從阻塞中恢復,也可以通過回撥事件進行切換,如此往復,這就是事件迴圈的簡單邏輯。
而上面最核心的動作就是切換別的方法,怎麼切換?用await關鍵字:
import asyncio async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') async def job2(): print('job2開始') async def main(): await job1() await job2() if __name__ == '__main__': asyncio.run(main())
系統返回:
job1開始 job1結束 job2開始
是的,切則切了,可切的對嗎?事實上這兩個協程任務並沒有達成“協作”,因為它們是同步執行的,所以並不是在方法內await了,就可以達成協程的工作方式,我們需要併發啟動這兩個協程任務:
import asyncio async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') async def job2(): print('job2開始') async def main(): #await job1() #await job2() await asyncio.gather(job1(), job2()) if __name__ == '__main__': asyncio.run(main())
系統返回:
job1開始 job2開始 job1結束
如果沒有asyncio.gather的參與,協程方法就是普通的同步方法,就算用async聲明瞭非同步也無濟於事。而asyncio.gather的基礎功能就是將協程任務併發執行,從而達成“協作”。
但事實上,Python3.10也支援“同步寫法”的協程方法:
async def create_task(): task1 = asyncio.create_task(job1()) task2 = asyncio.create_task(job2()) await task1 await task2
這裡我們通過asyncio.create_task對job1和job2進行封裝,返回的物件再通過await進行呼叫,由此兩個單獨的非同步方法就都被繫結到同一個Eventloop了,這樣雖然寫法上同步,但其實是非同步執行:
import asyncio async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') async def job2(): print('job2開始') async def create_task(): task1 = asyncio.create_task(job1()) task2 = asyncio.create_task(job2()) await task1 await task2 async def main(): #await job1() #await job2() await asyncio.gather(job1(), job2()) if __name__ == '__main__': asyncio.run(create_task())
系統返回:
job1開始 job2開始 job1結束
協程任務的上下游監控
解決了併發執行的問題,現在假設每個非同步任務都會返回一個操作結果:
async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') return "job1任務結果" async def job2(): print('job2開始') return "job2任務結果"
通過asyncio.gather方法,我們可以收集到任務執行結果:
async def main(): res = await asyncio.gather(job1(), job2()) print(res)
併發執行任務:
import asyncio async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') return "job1任務結果" async def job2(): print('job2開始') return "job2任務結果" async def main(): res = await asyncio.gather(job1(), job2()) print(res) if __name__ == '__main__': asyncio.run(main())
系統返回:
job1開始 job2開始 job1結束 ['job1', 'job2']
但任務結果僅僅也就是方法的返回值,除此之外,並沒有其他有價值的資訊,對協程任務的執行明細諱莫如深。
現在我們換成asyncio.wait方法:
async def main(): res = await asyncio.wait([job1(), job2()]) print(res)
依然併發執行:
import asyncio async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') return "job1任務結果" async def job2(): print('job2開始') return "job2任務結果" async def main(): res = await asyncio.wait([job1(), job2()]) print(res) if __name__ == '__main__': asyncio.run(main())
系統返回:
job1開始 job2開始 job1結束 ({<Task finished name='Task-2' coro=<job1() done, defined at /Users/liuyue/Downloads/upload/test/test_async.py:4> result='job1任務結果'>, <Task finished name='Task-3' coro=<job2() done, defined at /Users/liuyue/Downloads/upload/test/test_async.py:12> result='job2任務結果'>}, set())
可以看出,asyncio.wait返回的是任務物件,裡面儲存了大部分的任務資訊,包括執行狀態。
在預設情況下,asyncio.wait會等待全部任務完成 (return_when='ALL_COMPLETED'),它還支援 return_when='FIRST_COMPLETED'(第一個協程完成就返回)和 return_when='FIRST_EXCEPTION'(出現第一個異常就返回)。
這就非常令人興奮了,因為如果非同步消費任務是發簡訊之類的需要統計達到率的任務,利用asyncio.wait特性,我們就可以第一時間記錄任務完成或者異常的具體時間。
協程任務守護
假設由於某種原因,我們手動終止任務消費:
import asyncio async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') return "job1任務結果" async def job2(): print('job2開始') return "job2任務結果" async def main(): task1 = asyncio.create_task(job1()) task2 = asyncio.create_task(job2()) task1.cancel() res = await asyncio.gather(task1, task2) print(res) if __name__ == '__main__': asyncio.run(main())
系統報錯:
File "/Users/liuyue/Downloads/upload/test/test_async.py", line 23, in main res = await asyncio.gather(task1, task2) asyncio.exceptions.CancelledError
這裡job1被手動取消,但會影響job2的執行,這違背了協程“互相提攜”的特性。
事實上,asyncio.gather方法可以捕獲協程任務的異常:
import asyncio async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') return "job1任務結果" async def job2(): print('job2開始') return "job2任務結果" async def main(): task1 = asyncio.create_task(job1()) task2 = asyncio.create_task(job2()) task1.cancel() res = await asyncio.gather(task1, task2,return_exceptions=True) print(res) if __name__ == '__main__': asyncio.run(main())
系統返回:
job2開始 [CancelledError(''), 'job2任務結果']
可以看到job1沒有被執行,並且異常替代了任務結果作為返回值。
但如果協程任務啟動之後,需要保證任務情況下都不會被取消,此時可以使用asyncio.shield方法守護協程任務:
import asyncio async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') return "job1任務結果" async def job2(): print('job2開始') return "job2任務結果" async def main(): task1 = asyncio.shield(job1()) task2 = asyncio.create_task(job2()) res = await asyncio.gather(task1, task2,return_exceptions=True) task1.cancel() print(res) if __name__ == '__main__': asyncio.run(main())
系統返回:
job1開始 job2開始 job1結束 ['job1任務結果', 'job2任務結果']
協程任務回撥
假設協程任務執行完畢之後,需要立刻進行回撥操作,比如將任務結果推送到其他介面服務上:
import asyncio async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') return "job1任務結果" async def job2(): print('job2開始') return "job2任務結果" def callback(future): print(f'回撥任務: {future.result()}') async def main(): task1 = asyncio.shield(job1()) task2 = asyncio.create_task(job2()) task1.add_done_callback(callback) res = await asyncio.gather(task1, task2,return_exceptions=True) print(res) if __name__ == '__main__': asyncio.run(main())
這裡我們通過add_done_callback方法對job1指定了callback方法,當任務執行完以後,callback會被呼叫,系統返回:
job1開始 job2開始 job1結束 回撥任務: job1任務結果 ['job1任務結果', 'job2任務結果']
與此同時,add_done_callback方法不僅可以獲取協程任務返回值,它自己也支援引數引數傳遞:
import asyncio from functools import partial async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') return "job1任務結果" async def job2(): print('job2開始') return "job2任務結果" def callback(future,num): print(f"回撥引數{num}") print(f'回撥任務: {future.result()}') async def main(): task1 = asyncio.shield(job1()) task2 = asyncio.create_task(job2()) task1.add_done_callback(partial(callback,num=1)) res = await asyncio.gather(task1, task2,return_exceptions=True) print(res) if __name__ == '__main__': asyncio.run(main())
系統返回:
job1開始 job2開始 job1結束 回撥引數1 回撥任務: job1任務結果 ['job1任務結果', 'job2任務結果']
結語
成也使用者態,敗也使用者態。所謂水能載舟亦能覆舟,協程消費任務的排程遠比多執行緒的系統級排程要複雜,稍不留神就會造成業務上的“同步”阻塞,弄巧成拙,適得其反。這也解釋了為什麼相似場景中多執行緒的出場率要遠遠高於協程,就是因為多執行緒不需要考慮啟動後的“切換”問題,無為而為,簡單粗暴。
- Lock 鎖底層實現
- 【微服務】- 服務呼叫 - OpenFeign
- JDK19新特性使用詳解
- day05-離線留言和離線檔案
- #Lua:Lua呼叫C 生成的DLL庫
- MySQL面試題
- 各程式語言 aardio 相互呼叫示例
- Rust學習入門
- MySql的InnoDB的三層B 樹可以儲存兩千萬左右條資料的計算邏輯
- 組合總和 II
- Java基礎(識別符號,資料型別,資料轉換,變數)
- vue3 的 ref、isRef、toRef、toRefs、toRaw 詳細介紹
- aspnetcore6.0原始碼編譯除錯
- .NET 反向代理 YARP 通過編碼方式配置域名轉發
- .Net 不受 美國出口管理條例(EAR) 的約束
- 徹底掌握Makefile(二)
- 如何通過Java應用程式建立Word表格
- dotnet7 aot編譯實戰
- MasaFramework的MinimalAPI設計
- Spring入門(二):SpringBoot之基礎Web開發