- 多進程 (Multiprocessing): 操作系統層面的並行。每個進程有自己獨立的記憶體空間,進程之間互不影響。適合執行 CPU 密集型 任務,可以充分利用多核 CPU。創建和銷毀進程開銷較大。
- 多線程 (Multithreading): 在同一個進程內創建多個執行流。線程共享進程的記憶體空間。適合執行 I/O 密集型 任務(如網路請求、檔案讀寫),因為在等待 I/O 時,其他線程可以繼續執行。但在 Python 中受 GIL (Global Interpreter Lock) 的限制,同一時刻只有一個線程能執行 Python 字節碼,所以在 CPU 密集型任務上,多線程並不能實現真正的並行。創建和銷毀線程開銷比進程小。
- 協程 (Coroutines): 使用者空間的協作式多任務。協程是輕量級的,由程式自身控制切換,而不是由操作系統調度。它們在一個線程內執行,通過
await
或yield from
主動讓出控制權,允許其他協程運行。特別適合處理 大量 I/O 密集型 任務,因為切換開銷非常小。協程不能利用多核 CPU 進行並行計算。
1. 多進程 (Multiprocessing)#
使用 multiprocessing
模組。請注意,在 Windows 上運行多進程代碼時,通常需要將主邏輯放在 if __name__ == "__main__":
區塊中。
import multiprocessing
import time
import os
def process_task(name, duration):
"""模擬一個耗時任務"""
print(f"進程 {name} (PID: {os.getpid()}) 開始,耗時 {duration} 秒...")
time.sleep(duration)
print(f"進程 {name} (PID: {os.getpid()}) 結束")
if __name__ == "__main__": # Windows 需要這個保護
print("--- 開始多進程示例 ---")
start_time = time.time()
# 創建兩個進程
p1 = multiprocessing.Process(target=process_task, args=("進程A", 2))
p2 = multiprocessing.Process(target=process_task, args=("進程B", 3))
# 啟動進程
p1.start()
p2.start()
# 等待所有進程完成
p1.join()
p2.join()
end_time = time.time()
print(f"--- 多進程示例結束,總耗時: {end_time - start_time:.2f} 秒 ---")
# 理論上,如果CPU是多核且兩個任務都是CPU密集型,總耗時會接近max(2, 3)=3秒
# 但我們這裡用sleep模擬IO,所以總耗時仍然會接近max(2, 3)=3秒,但關鍵在於它們是並行執行的
運行結果分析:
進程 A 和進程 B 幾乎同時開始,並且它們的PID 是不同的。總耗時接近兩個任務中耗時最長的那個。這表明它們是並行執行的,各自在獨立的進程中運行。
2. 多線程 (Multithreading)#
使用 threading
模組。線程在同一個進程內運行,共享記憶體。
import threading
import time
import os
def thread_task(name, duration):
"""模擬一個耗時任務"""
# 注意:這裡模擬的是IO密集型任務(等待)
print(f"線程 {name} (PID: {os.getpid()}) 開始,耗時 {duration} 秒...")
time.sleep(duration)
print(f"線程 {name} (PID: {os.getpid()}) 結束")
if __name__ == "__main__": # 儘管threading不強制,但習慣上也會放在這裡
print("--- 開始多線程示例 ---")
start_time = time.time()
# 創建兩個線程
t1 = threading.Thread(target=thread_task, args=("線程A", 2))
t2 = threading.Thread(target=thread_task, args=("線程B", 3))
# 啟動線程
t1.start()
t2.start()
# 等待所有線程完成
t1.join()
t2.join()
end_time = time.time()
print(f"--- 多線程示例結束,總耗時: {end_time - start_time:.2f} 秒 ---")
# 因為是模擬IO(sleep會釋放GIL),所以總耗時會接近max(2, 3)=3秒。
# 如果是純CPU計算任務,受GIL影響,總耗時會接近2+3=5秒。
運行結果分析:
線程 A 和線程 B 幾乎同時開始,但它們的PID 是相同的,因為它們在同一個進程內。總耗時接近兩個任務中耗時最長的那個,因為 time.sleep()
在等待時會釋放 GIL,允許其他線程運行。
3. 協程 (Coroutines) - 使用 asyncio
#
使用 asyncio
模組。協程是單線程的,通過事件循環 (event loop) 來調度。await
是關鍵,它表示當前協程暫停執行,將控制權交還給事件循環,允許事件循環去運行其他準備好的協程。
import asyncio
import time
import os
async def async_task(name, duration):
"""模擬一個異步耗時任務"""
# 注意:這裡必須使用 await asyncio.sleep() 來模擬異步等待
print(f"協程 {name} (PID: {os.getpid()}) 開始,耗時 {duration} 秒...")
await asyncio.sleep(duration) # <-- 關鍵:協程在這裡讓出控制權
print(f"協程 {name} (PID: {os.getpid()}) 結束")
async def main():
"""主協程,創建並運行其他協程"""
print("--- 開始協程示例 ---")
start_time = time.time()
# 創建兩個協程對象
task1 = async_task("協程A", 2)
task2 = async_task("協程B", 3)
# 使用 asyncio.gather 並發運行協程,並等待它們完成
await asyncio.gather(task1, task2)
end_time = time.time()
print(f"--- 協程示例結束,總耗時: {end_time - start_time:.2f} 秒 ---")
# 總耗時接近max(2, 3)=3秒,因為它們在等待時可以切換執行。
if __name__ == "__main__":
asyncio.run(main()) # 啟動事件循環並運行主協程
運行結果分析:
協程 A 和協程 B 幾乎同時開始,它們的PID 也是相同的(因為它們都在同一個進程的同一個線程裡運行)。總耗時接近兩個任務中耗時最長的那個。這是因為當協程 A 執行到 await asyncio.sleep(2)
時,它會將控制權交給事件循環,事件循環發現協程 B 已經準備好運行(它也剛啟動),就會切換到協程 B 執行。當協程 B 也遇到 await asyncio.sleep(3)
時,同樣讓出控制權。事件循環會等待哪個協程先完成等待,然後恢復其執行。
4. 總結與對比#
特性 | 多進程 (multiprocessing) | 多線程 (threading) | 協程 (asyncio) |
---|---|---|---|
並行 / 並發 | 並行 (Parallelism) - 真正利用多核 | 並發 (Concurrency) - IO 密集型效率高,CPU 密集型受 GIL 限制 | 並發 (Concurrency) - 協作式,單線程內切換 |
資源消耗 | 重 (獨立記憶體空間) | 輕 (共享記憶體空間) | 最輕 (使用者空間,棧開銷小) |
切換方式 | 操作系統調度 (搶佔式) | 操作系統調度 (搶佔式) | 程式控制 (await /yield from ) (協作式) |
適用場景 | CPU 密集型任務,需要充分利用多核 | I/O 密集型任務,簡單的並發需求 | 大量 I/O 密集型任務,高並發連接 |
數據共享 | 需要 IPC (管道、佇列等),複雜 | 共享記憶體,需要鎖等同步機制,較複雜 | 共享記憶體,通常通過參數傳遞或共享物件,需注意協程安全 |
錯誤處理 | 一個進程崩潰不影響其他進程 | 一個線程崩潰可能導致整個進程崩潰 | 一個協程異常通常只影響自身,但未捕獲可能影響事件循環 |
Python 限制 | 不受 GIL 影響 (每個進程有自己的解釋器) | 受 GIL 影響 (同一時刻只有一個線程執行 Python 字節碼) | 不受 GIL 直接影響 (因為只在一個線程內),但 CPU 密集型任務會阻塞整個事件循環 |
5. 協程的 async await asyncio#
如果沒寫 asyncio
、await
、async
會發生什麼?#
async
、await
和 asyncio
是協程協作模型的語法糖和運行時環境,缺一不可
-
沒寫
async def
:- 如果您定義一個函數,但忘了寫
async def
,它就是一個普通的同步函數。 - 後果:
- 無法使用
await
: 如果在這個函數內部使用了await
,Python 會直接報SyntaxError
錯誤,因為await
只能在async def
函數內部使用。 - 無法被
await
: 這個普通函數調用後直接返回結果(或拋出異常),它不是一個可等待對象,你不能在其他async def
函數內部await
它。
- 無法使用
- 如果您定義一個函數,但忘了寫
# 錯誤示例:在普通函數中使用 await
# def blocking_function(): # 缺少 async
# await asyncio.sleep(1) # SyntaxError: 'await' not in async function
# print("這將無法運行")
-
沒寫
await
:- 如果在一個
async def
函數內部,調用了一个可等待對象(比如另一個協程函數返回的對象Workspace_url(...)
或asyncio.sleep(...)
),但忘了寫await
。 - 後果:
-
不會等待: 當前協程會立即繼續執行,不會暫停,也不會等待那個可等待對象的結果。
-
可等待對象被忽略 (或產生警告): 調用
Workspace_url(...)
或asyncio.sleep(...)
會返回一個協程對象,但因為前面沒有await
,這個協程對象不會被提交給事件循環調度執行。它就像一個被創建出來但沒有被使用的變量一樣,靜靜地躺在那裡,直到被垃圾回收。這通常會導致預期的異步操作根本沒有發生。Python 解釋器可能會發出一個運行時警告,提示你有一個協程從未被 awaited。這是asyncio
編程中一個非常常見的錯誤來源。
-
- 如果在一個
-
沒寫
asyncio.run()
(或等效的事件循環啟動代碼):- 如果您定義了一個頂層的
async def main()
函數,但只是簡單地調用main()
。 - 後果:
- 異步代碼不會運行: 調用
main()
只會返回一個協程對象。這個協程對象包含了所有異步邏輯的代碼,但它沒有被提交給任何事件循環來執行。事件循環是異步代碼運行所必需的 “引擎”。沒有啟動事件循環並把頂層協程交給它,任何async def
函數內部的代碼(包括await
調用)都不會被執行。
- 異步代碼不會運行: 調用
async def my_app(): print("我是一个异步应用") await asyncio.sleep(1) print("我本该运行完毕") # 錯誤示例:直接調用 async 函數 # my_app() # 調用後返回一個協程對象,但不會打印任何東西 # print("程序結束") # 這句話會立即執行 # 正確做法是: # asyncio.run(my_app()) # 啟動事件循環並運行 my_app 協程
- 如果您定義了一個頂層的
async def
標記函數是異步的,使其調用返回協程對象。await
在異步函數內部使用,標記一個暫停點,將控制權交還給事件循環,並等待一個可等待對象完成。asyncio
(特別是事件循環,通過asyncio.run
或其他方式啟動) 是執行協程的運行時環境,它接收協程對象,調度它們的執行,並在await
點之間切換。
6. 協程的 create_task#
怎麼才能讓多個協程同時開始運行,而不是一個 await
完再 await
另一個(那樣是串行了)?
asyncio.create_task()
就是解決這個問題的關鍵函數之一。
asyncio.create_task(coro)
的作用#
- 功能: 將一個協程對象 (
coro
) 包裝成一個Task
對象,並將其安排到當前正在運行的事件循環中等待執行。 - 返回值: 返回一個
asyncio.Task
對象。
Task
是什麼?#
asyncio.Task
是asyncio
提供的核心對象之一。- 可以被看作是事件循環中正在運行(或已安排好要運行)的一个协程的句柄或代表。
Task
對象本身是可等待的 (awaitable)。可以await
一個 Task 對象來等待它包裝的協程完成並獲取結果。Task
對象提供了檢查協程狀態(是否完成、是否被取消)、獲取結果、獲取異常或取消協程的方法。
asyncio.create_task()
vs 直接 await
#
-
直接
await
一個協程調用:async def main(): print("main start") await some_async_function() # <-- 當前 main 協程會在這裡暫停,直到 some_async_function 完成 print("main end") async def some_async_function(): print("some_async_function start") await asyncio.sleep(2) print("some_async_function end") # 執行順序: main start -> some_async_function start -> 等待2秒 -> some_async_function end -> main end # main 函數在等待 some_async_function 完成,是串行等待。
直接
await
意味著當前協程必須等待被await
的可等待對象完成,才能繼續往下執行。 -
使用
asyncio.create_task()
:async def main(): print("main start") # 創建一個 Task,將 some_async_function 安排到事件循環 task = asyncio.create_task(some_async_function()) print("some_async_function 已安排為 Task,main 繼續執行") # 此時 main 協程會立即往下執行,不會等待 task 完成 # task 會在事件循環中與 main 並發運行 # 如果 main 函數不在這裡 await task,它會很快運行完畢 # 為了確保 main 等待 task 真正完成,我們需要在某個地方 await task await task # <-- main 在這裡等待 task 完成 print("task 完成,main end") async def some_async_function(): print("some_async_function start") await asyncio.sleep(2) print("some_async_function end") # 執行順序: main start -> some_async_function 已安排... -> main 繼續執行 -> (事件循環切換) some_async_function start -> (事件循環切換) main 繼續執行... -> (事件循環切換) 等待2秒 -> some_async_function end -> (事件循環切換) task 完成,main end # main 和 task 是並發運行的。main 在調度 task 後沒有立即阻塞,而是繼續往下執行了。 # 最後的 await task 確保 main 不會在 task 完成前結束。
asyncio.create_task()
告訴事件循環:“這是另一個協程任務,把它加到你的待辦列表裡,在合適的時候運行它。” 調用create_task
的協程不會暫停,它會立即返回一個Task
對象,然後繼續執行自己的代碼。
create_task()
的典型應用場景#
- 啟動 “後台” 任務: 想讓某個協程開始執行,但不關心何時完成,或者只是偶爾需要檢查它的狀態。比如,啟動一個日誌記錄協程、一個監控協程等。
- 需要獨立管理任務時: 可能需要獲取 Task 對象來取消一個正在運行的任務 (
task.cancel()
),或者檢查它是否已經完成 (task.done()
),或者獲取結果 (task.result()
)。 - 與
asyncio.gather()
結合使用:asyncio.gather()
可以直接接受協程對象,也可以接受 Task 對象。雖然直接傳協程對象更簡潔,但在需要先創建 Task 對象進行一些預處理或管理時,create_task
就很有用。
asyncio.gather()
與 create_task()
的配合#
用 create_task
啟動多個協程,然後用 asyncio.gather
一起等待它們完成。
async def download_page(url):
print(f"開始下載: {url}")
await asyncio.sleep(random.randint(1, 3)) # 模擬下載時間
print(f"下載完成: {url}")
return f"Content of {url[:20]}..."
async def main_with_gather():
urls = ["url1", "url2", "url3"] # 真實的 URL
print("main_with_gather: 準備創建任務")
# 創建 Task 列表
tasks = []
for url in urls:
# 調用 download_page 返回協程對象,然後用 create_task 包成 Task
task = asyncio.create_task(download_page(url))
tasks.append(task)
print("main_with_gather: 所有任務已創建")
# await asyncio.gather 等待 tasks 列表中的所有 Task 完成
# 在等待期間,事件循環會調度 tasks 中的協程並發運行
results = await asyncio.gather(*tasks)
print("main_with_gather: 所有任務已完成")
for result in results:
print(f"結果: {result}")
import random
if __name__ == "__main__":
asyncio.run(main_with_gather())
asyncio.create_task()
將每個 download_page(url)
協程變成了可以在事件循環中獨立運行的 Task。await asyncio.gather(*tasks)
則讓 main_with_gather
協程暫停,直到所有這些 Task 都完成。在這段等待期間,事件循環會負責在這些 Task 之間切換執行,從而實現並發下載。
注意: create_task()
必須在事件循環已經運行之後調用。在使用 asyncio.run(main())
進入 main
協程時,事件循環就已經在運行了,所以可以在 main
或由 main
調用的其他協程中安全地使用 create_task
。
Task 對象的一些方法#
task.done()
: 檢查 Task 是否已完成 (包括正常完成、拋出異常或被取消)。task.result()
: 獲取 Task 的結果。如果在 Task 完成前調用,會拋出InvalidStateError
;如果 Task 因異常完成,會重新拋出該異常。task.exception()
: 獲取 Task 完成時的異常。如果正常完成或未完成,返回None
。task.cancel()
: 請求取消 Task。Task 內部會收到一個asyncio.CancelledError
異常。Task 需要自己處理這個異常來實現優雅取消。task.cancelled()
: 檢查 Task 是否被取消。
7. 協程通用模板#
# 這是一個 Python 協程 (asyncio) 的通用代碼模板
import asyncio
import time
import random # 導入 random 用於模擬不同任務的耗時
# =============================================================================
# 步驟 1: 定義獨立的異步任務 (協程函數)
# 使用 async def 關鍵字定義,表示這是一個協程函數。
# 協程函數內部會使用 await 調用其他可等待對象(如異步 I/O 操作、asyncio 提供的異步工具或其它協程)。
# 這些函數通常包含實際的異步工作負載。
# =============================================================================
async def async_worker_task(task_id: int, simulate_duration: int):
"""
定義一個異步工作任務的協程函數。
Args:
task_id: 任務的唯一標識符。
simulate_duration: 模擬任務需要花費的秒數(實際是 await asyncio.sleep 的時間)。
"""
# 任務開始時的打印,通常會看到多個任務的“開始”打印幾乎同時出現
print(f"[任務 {task_id}] 開始執行, 预计耗時 {simulate_duration} 秒...")
# 使用 await 來執行一個異步操作。
# asyncio.sleep() 是一個可等待對象,await 它時,當前協程會暫停,
# 控制權會交給事件循環,事件循環可以去運行其他準備好的協程。
# 這模擬了等待一個異步 I/O 操作(如網路響應、資料庫查詢結果等)的過程,期間不阻塞整個線程。
await asyncio.sleep(simulate_duration)
# 任務完成時的打印
print(f"[任務 {task_id}] 執行完畢。")
# 任務可以返回一個結果
return f"任務 {task_id} 完成,耗時 {simulate_duration} 秒。"
# =============================================================================
# 步驟 2: 定義主異步函數 (Orchestrator / 入口)
# 這是整個異步程序的入口點,也是一个 async def 函數。
# 它負責創建、組織和調度其他的協程任務。
# 通常會在這裡使用 asyncio.gather 或 asyncio.create_task 來管理多個任務的並發執行。
# =============================================================================
async def main_async_orchestrator():
"""
主異步函數,負責創建和運行 worker 任務。
"""
print("主協調器: 異步流程開始")
start_time = time.time()
# 準備一些任務的參數
task_configurations = [
(1, random.randint(1, 4)), # 任務1,隨機耗時1-4秒
(2, random.randint(2, 5)), # 任務2,隨機耗時2-5秒
(3, random.randint(1, 3)), # 任務3,隨機耗時1-3秒
(4, random.randint(3, 6)), # 任務4,隨機耗時3-6秒
]
# =========================================================================
# 步驟 3: 創建協程對象
# 調用 async def 函數會返回一個協程對象,此時協程內部的代碼還沒有開始執行。
# =========================================================================
# 創建一個協程對象列表
coroutine_objects = [
async_worker_task(task_id, duration)
for task_id, duration in task_configurations
]
print(f"主協調器: 已創建 {len(coroutine_objects)} 個協程對象。")
# =========================================================================
# 步驟 4: 並發執行協程任務
# 使用 asyncio.gather() 是最常用的方法,它可以並發地運行列表中的所有“可等待對象”
# (包括協程對象和 Task 對象),並等待它們全部完成。
# await asyncio.gather(...) 會暫停當前主協程,並將控制權交給事件循環,
# 事件循環會同時調度 coroutine_objects 中的協程運行。
# 當所有協程都完成後,gather 返回一個包含所有協程返回值的列表。
# =========================================================================
print("主協調器: 使用 asyncio.gather 並發運行任務...")
# await 是這裡的關鍵,主協程會在這裡等待所有子任務完成
results = await asyncio.gather(*coroutine_objects)
print("主協調器: 所有並發任務已完成。")
print("主協調器: 所有任務的結果如下:")
for res in results:
print(f"- {res}")
# =========================================================================
# 可選: 演示使用 asyncio.create_task 創建任務而不立即等待
# 如果你不想等待某個任務,只想讓它在後台運行,可以使用 asyncio.create_task()
# background_task = asyncio.create_task(some_other_async_task())
# print("主協調器: 啟動了一個後台任務...")
# 注意:如果主協程在後台任務完成前結束,後台任務可能會被取消,
# 如果需要確保後台任務完成,你可能需要在某個地方 await background_task
# =========================================================================
end_time = time.time()
print(f"主協調器: 異步流程結束。總耗時: {end_time - start_time:.2f} 秒")
# =============================================================================
# 步驟 5: 程序的入口點
# 使用標準的 if __name__ == "__main__": 保護塊。
# 在這裡調用 asyncio.run() 來啟動事件循環,並運行頂層的主異步函數。
# asyncio.run() 是 Python 3.7+ 推薦的方式,它會負責創建事件循環、
# 運行傳入的協程直到完成,並最後關閉事件循環。
# =============================================================================
if __name__ == "__main__":
print("程序開始")
# 調用 asyncio.run() 來執行我們的主異步函數 main_async_orchestrator
asyncio.run(main_async_orchestrator())
print("程序已完全退出。")
程序開始
主協調器: 異步流程開始
主協調器: 已創建 4 個協程對象。
主協調器: 使用 asyncio.gather 並發運行任務...
[任務 1] 開始執行, 预计耗時 2 秒...
[任務 2] 開始執行, 预计耗時 5 秒...
[任務 3] 開始執行, 预计耗時 2 秒...
[任務 4] 開始執行, 预计耗時 5 秒...
[任務 1] 執行完畢。
[任務 3] 執行完畢。
[任務 2] 執行完畢。
[任務 4] 執行完畢。
主協調器: 所有並發任務已完成。
主協調器: 所有任務的結果如下:
- 任務 1 完成,耗時 2 秒。
- 任務 2 完成,耗時 5 秒。
- 任務 3 完成,耗時 2 秒。
- 任務 4 完成,耗時 5 秒。
主協調器: 異步流程結束。總耗時: 5.00 秒
程序已完全退出。
此文由 Mix Space 同步更新至 xLog
原始鏈接為 https://blog.kanes.top/posts/default/understanding-python-multithreading-multiprocessing-coroutines