banner
kanes

kanes

深入淺出Python多線程、多進程、協程

  1. 多進程 (Multiprocessing): 操作系統層面的並行。每個進程有自己獨立的記憶體空間,進程之間互不影響。適合執行 CPU 密集型 任務,可以充分利用多核 CPU。創建和銷毀進程開銷較大。
  2. 多線程 (Multithreading): 在同一個進程內創建多個執行流。線程共享進程的記憶體空間。適合執行 I/O 密集型 任務(如網路請求、檔案讀寫),因為在等待 I/O 時,其他線程可以繼續執行。但在 Python 中受 GIL (Global Interpreter Lock) 的限制,同一時刻只有一個線程能執行 Python 字節碼,所以在 CPU 密集型任務上,多線程並不能實現真正的並行。創建和銷毀線程開銷比進程小。
  3. 協程 (Coroutines): 使用者空間的協作式多任務。協程是輕量級的,由程式自身控制切換,而不是由操作系統調度。它們在一個線程內執行,通過 awaityield from 主動讓出控制權,允許其他協程運行。特別適合處理 大量 I/O 密集型 任務,因為切換開銷非常小。協程不能利用多核 CPU 進行並行計算。

1. 多進程 (Multiprocessing)#

使用 multiprocessing 模組。請注意,在 Windows 上運行多進程代碼時,通常需要將主邏輯放在 if __name__ == "__main__": 區塊中。

import multiprocessing
import time
import os

def process_task(name, duration):
    """模擬一個耗時任務"""
    print(f"進程 {name} (PID: {os.getpid()}) 開始,耗時 {duration} 秒...")
    time.sleep(duration)
    print(f"進程 {name} (PID: {os.getpid()}) 結束")

if __name__ == "__main__": # Windows 需要這個保護
    print("--- 開始多進程示例 ---")
    start_time = time.time()

    # 創建兩個進程
    p1 = multiprocessing.Process(target=process_task, args=("進程A", 2))
    p2 = multiprocessing.Process(target=process_task, args=("進程B", 3))

    # 啟動進程
    p1.start()
    p2.start()

    # 等待所有進程完成
    p1.join()
    p2.join()

    end_time = time.time()
    print(f"--- 多進程示例結束,總耗時: {end_time - start_time:.2f} 秒 ---")
    # 理論上,如果CPU是多核且兩個任務都是CPU密集型,總耗時會接近max(2, 3)=3秒
    # 但我們這裡用sleep模擬IO,所以總耗時仍然會接近max(2, 3)=3秒,但關鍵在於它們是並行執行的

運行結果分析:
進程 A 和進程 B 幾乎同時開始,並且它們的PID 是不同的。總耗時接近兩個任務中耗時最長的那個。這表明它們是並行執行的,各自在獨立的進程中運行。


2. 多線程 (Multithreading)#

使用 threading 模組。線程在同一個進程內運行,共享記憶體。

import threading
import time
import os

def thread_task(name, duration):
    """模擬一個耗時任務"""
    # 注意:這裡模擬的是IO密集型任務(等待)
    print(f"線程 {name} (PID: {os.getpid()}) 開始,耗時 {duration} 秒...")
    time.sleep(duration)
    print(f"線程 {name} (PID: {os.getpid()}) 結束")

if __name__ == "__main__": # 儘管threading不強制,但習慣上也會放在這裡
    print("--- 開始多線程示例 ---")
    start_time = time.time()

    # 創建兩個線程
    t1 = threading.Thread(target=thread_task, args=("線程A", 2))
    t2 = threading.Thread(target=thread_task, args=("線程B", 3))

    # 啟動線程
    t1.start()
    t2.start()

    # 等待所有線程完成
    t1.join()
    t2.join()

    end_time = time.time()
    print(f"--- 多線程示例結束,總耗時: {end_time - start_time:.2f} 秒 ---")
    # 因為是模擬IO(sleep會釋放GIL),所以總耗時會接近max(2, 3)=3秒。
    # 如果是純CPU計算任務,受GIL影響,總耗時會接近2+3=5秒。

運行結果分析:
線程 A 和線程 B 幾乎同時開始,但它們的PID 是相同的,因為它們在同一個進程內。總耗時接近兩個任務中耗時最長的那個,因為 time.sleep() 在等待時會釋放 GIL,允許其他線程運行。


3. 協程 (Coroutines) - 使用 asyncio#

使用 asyncio 模組。協程是單線程的,通過事件循環 (event loop) 來調度。await 是關鍵,它表示當前協程暫停執行,將控制權交還給事件循環,允許事件循環去運行其他準備好的協程。

import asyncio
import time
import os

async def async_task(name, duration):
    """模擬一個異步耗時任務"""
    # 注意:這裡必須使用 await asyncio.sleep() 來模擬異步等待
    print(f"協程 {name} (PID: {os.getpid()}) 開始,耗時 {duration} 秒...")
    await asyncio.sleep(duration) # <-- 關鍵:協程在這裡讓出控制權
    print(f"協程 {name} (PID: {os.getpid()}) 結束")

async def main():
    """主協程,創建並運行其他協程"""
    print("--- 開始協程示例 ---")
    start_time = time.time()

    # 創建兩個協程對象
    task1 = async_task("協程A", 2)
    task2 = async_task("協程B", 3)

    # 使用 asyncio.gather 並發運行協程,並等待它們完成
    await asyncio.gather(task1, task2)

    end_time = time.time()
    print(f"--- 協程示例結束,總耗時: {end_time - start_time:.2f} 秒 ---")
    # 總耗時接近max(2, 3)=3秒,因為它們在等待時可以切換執行。
    
if __name__ == "__main__":
    asyncio.run(main()) # 啟動事件循環並運行主協程

運行結果分析:
協程 A 和協程 B 幾乎同時開始,它們的PID 也是相同的(因為它們都在同一個進程的同一個線程裡運行)。總耗時接近兩個任務中耗時最長的那個。這是因為當協程 A 執行到 await asyncio.sleep(2) 時,它會將控制權交給事件循環,事件循環發現協程 B 已經準備好運行(它也剛啟動),就會切換到協程 B 執行。當協程 B 也遇到 await asyncio.sleep(3) 時,同樣讓出控制權。事件循環會等待哪個協程先完成等待,然後恢復其執行。


4. 總結與對比#

特性多進程 (multiprocessing)多線程 (threading)協程 (asyncio)
並行 / 並發並行 (Parallelism) - 真正利用多核並發 (Concurrency) - IO 密集型效率高,CPU 密集型受 GIL 限制並發 (Concurrency) - 協作式,單線程內切換
資源消耗重 (獨立記憶體空間)輕 (共享記憶體空間)最輕 (使用者空間,棧開銷小)
切換方式操作系統調度 (搶佔式)操作系統調度 (搶佔式)程式控制 (await/yield from) (協作式)
適用場景CPU 密集型任務,需要充分利用多核I/O 密集型任務,簡單的並發需求大量 I/O 密集型任務,高並發連接
數據共享需要 IPC (管道、佇列等),複雜共享記憶體,需要鎖等同步機制,較複雜共享記憶體,通常通過參數傳遞或共享物件,需注意協程安全
錯誤處理一個進程崩潰不影響其他進程一個線程崩潰可能導致整個進程崩潰一個協程異常通常只影響自身,但未捕獲可能影響事件循環
Python 限制不受 GIL 影響 (每個進程有自己的解釋器)受 GIL 影響 (同一時刻只有一個線程執行 Python 字節碼)不受 GIL 直接影響 (因為只在一個線程內),但 CPU 密集型任務會阻塞整個事件循環

5. 協程的 async await asyncio#

如果沒寫 asyncioawaitasync 會發生什麼?#

asyncawaitasyncio 是協程協作模型的語法糖和運行時環境,缺一不可

  1. 沒寫 async def

    • 如果您定義一個函數,但忘了寫 async def,它就是一個普通的同步函數。
    • 後果:
      • 無法使用 await 如果在這個函數內部使用了 await,Python 會直接報 SyntaxError 錯誤,因為 await 只能在 async def 函數內部使用。
      • 無法被 await 這個普通函數調用後直接返回結果(或拋出異常),它不是一個可等待對象,你不能在其他 async def 函數內部 await 它。
    # 錯誤示例:在普通函數中使用 await
    # def blocking_function(): # 缺少 async
    #     await asyncio.sleep(1) # SyntaxError: 'await' not in async function
    #     print("這將無法運行")
  1. 沒寫 await

    • 如果在一個 async def 函數內部,調用了一个可等待對象(比如另一個協程函數返回的對象 Workspace_url(...)asyncio.sleep(...)),但忘了寫 await
    • 後果:
      • 不會等待: 當前協程會立即繼續執行,不會暫停,也不會等待那個可等待對象的結果。

      • 可等待對象被忽略 (或產生警告): 調用 Workspace_url(...)asyncio.sleep(...) 會返回一個協程對象,但因為前面沒有 await,這個協程對象不會被提交給事件循環調度執行。它就像一個被創建出來但沒有被使用的變量一樣,靜靜地躺在那裡,直到被垃圾回收。這通常會導致預期的異步操作根本沒有發生。Python 解釋器可能會發出一個運行時警告,提示你有一個協程從未被 awaited。這是 asyncio 編程中一個非常常見的錯誤來源。

  2. 沒寫 asyncio.run() (或等效的事件循環啟動代碼):

    • 如果您定義了一個頂層的 async def main() 函數,但只是簡單地調用 main()
    • 後果:
      • 異步代碼不會運行: 調用 main() 只會返回一個協程對象。這個協程對象包含了所有異步邏輯的代碼,但它沒有被提交給任何事件循環來執行。事件循環是異步代碼運行所必需的 “引擎”。沒有啟動事件循環並把頂層協程交給它,任何 async def 函數內部的代碼(包括 await 調用)都不會被執行。
    async def my_app():
        print("我是一个异步应用")
        await asyncio.sleep(1)
        print("我本该运行完毕")
    
    # 錯誤示例:直接調用 async 函數
    # my_app() # 調用後返回一個協程對象,但不會打印任何東西
    # print("程序結束") # 這句話會立即執行
    
    # 正確做法是:
    # asyncio.run(my_app()) # 啟動事件循環並運行 my_app 協程
    
  • async def 標記函數是異步的,使其調用返回協程對象。
  • await 在異步函數內部使用,標記一個暫停點,將控制權交還給事件循環,並等待一個可等待對象完成。
  • asyncio (特別是事件循環,通過 asyncio.run 或其他方式啟動) 是執行協程的運行時環境,它接收協程對象,調度它們的執行,並在 await 點之間切換。

6. 協程的 create_task#

怎麼才能讓多個協程同時開始運行,而不是一個 await 完再 await 另一個(那樣是串行了)?

asyncio.create_task() 就是解決這個問題的關鍵函數之一。

asyncio.create_task(coro) 的作用#

  • 功能: 將一個協程對象 (coro) 包裝成一個 Task 對象,並將其安排到當前正在運行的事件循環中等待執行
  • 返回值: 返回一個 asyncio.Task 對象。

Task 是什麼?#

  • asyncio.Taskasyncio 提供的核心對象之一。
  • 可以被看作是事件循環中正在運行(或已安排好要運行)的一个协程的句柄或代表
  • Task 對象本身是可等待的 (awaitable)。可以 await 一個 Task 對象來等待它包裝的協程完成並獲取結果。
  • Task 對象提供了檢查協程狀態(是否完成、是否被取消)、獲取結果、獲取異常或取消協程的方法。

asyncio.create_task() vs 直接 await#

  1. 直接 await 一個協程調用:

    async def main():
        print("main start")
        await some_async_function() # <-- 當前 main 協程會在這裡暫停,直到 some_async_function 完成
        print("main end")
    
    async def some_async_function():
        print("some_async_function start")
        await asyncio.sleep(2)
        print("some_async_function end")
    
    # 執行順序: main start -> some_async_function start -> 等待2秒 -> some_async_function end -> main end
    # main 函數在等待 some_async_function 完成,是串行等待。
    

    直接 await 意味著當前協程必須等待await 的可等待對象完成,才能繼續往下執行。

  2. 使用 asyncio.create_task()

    async def main():
        print("main start")
        # 創建一個 Task,將 some_async_function 安排到事件循環
        task = asyncio.create_task(some_async_function())
        print("some_async_function 已安排為 Task,main 繼續執行")
    
        # 此時 main 協程會立即往下執行,不會等待 task 完成
        # task 會在事件循環中與 main 並發運行
    
        # 如果 main 函數不在這裡 await task,它會很快運行完畢
        # 為了確保 main 等待 task 真正完成,我們需要在某個地方 await task
        await task # <-- main 在這裡等待 task 完成
        print("task 完成,main end")
    
    async def some_async_function():
        print("some_async_function start")
        await asyncio.sleep(2)
        print("some_async_function end")
    
    # 執行順序: main start -> some_async_function 已安排... -> main 繼續執行 -> (事件循環切換) some_async_function start -> (事件循環切換) main 繼續執行... -> (事件循環切換) 等待2秒 -> some_async_function end -> (事件循環切換) task 完成,main end
    # main 和 task 是並發運行的。main 在調度 task 後沒有立即阻塞,而是繼續往下執行了。
    # 最後的 await task 確保 main 不會在 task 完成前結束。
    

    asyncio.create_task() 告訴事件循環:“這是另一個協程任務,把它加到你的待辦列表裡,在合適的時候運行它。” 調用 create_task 的協程不會暫停,它會立即返回一個 Task 對象,然後繼續執行自己的代碼。

create_task() 的典型應用場景#

  1. 啟動 “後台” 任務: 想讓某個協程開始執行,但不關心何時完成,或者只是偶爾需要檢查它的狀態。比如,啟動一個日誌記錄協程、一個監控協程等。
  2. 需要獨立管理任務時: 可能需要獲取 Task 對象來取消一個正在運行的任務 (task.cancel()),或者檢查它是否已經完成 (task.done()),或者獲取結果 (task.result())。
  3. asyncio.gather() 結合使用: asyncio.gather() 可以直接接受協程對象,也可以接受 Task 對象。雖然直接傳協程對象更簡潔,但在需要先創建 Task 對象進行一些預處理或管理時,create_task 就很有用。

asyncio.gather()create_task() 的配合#

create_task 啟動多個協程,然後用 asyncio.gather 一起等待它們完成。

async def download_page(url):
    print(f"開始下載: {url}")
    await asyncio.sleep(random.randint(1, 3)) # 模擬下載時間
    print(f"下載完成: {url}")
    return f"Content of {url[:20]}..."

async def main_with_gather():
    urls = ["url1", "url2", "url3"] # 真實的 URL
    print("main_with_gather: 準備創建任務")

    # 創建 Task 列表
    tasks = []
    for url in urls:
        # 調用 download_page 返回協程對象,然後用 create_task 包成 Task
        task = asyncio.create_task(download_page(url))
        tasks.append(task)
    print("main_with_gather: 所有任務已創建")

    # await asyncio.gather 等待 tasks 列表中的所有 Task 完成
    # 在等待期間,事件循環會調度 tasks 中的協程並發運行
    results = await asyncio.gather(*tasks)
    print("main_with_gather: 所有任務已完成")

    for result in results:
        print(f"結果: {result}")

import random
if __name__ == "__main__":
    asyncio.run(main_with_gather())

asyncio.create_task() 將每個 download_page(url) 協程變成了可以在事件循環中獨立運行的 Task。await asyncio.gather(*tasks) 則讓 main_with_gather 協程暫停,直到所有這些 Task 都完成。在這段等待期間,事件循環會負責在這些 Task 之間切換執行,從而實現並發下載。

注意: create_task() 必須在事件循環已經運行之後調用。在使用 asyncio.run(main()) 進入 main 協程時,事件循環就已經在運行了,所以可以在 main 或由 main 調用的其他協程中安全地使用 create_task

Task 對象的一些方法#

  • task.done(): 檢查 Task 是否已完成 (包括正常完成、拋出異常或被取消)。
  • task.result(): 獲取 Task 的結果。如果在 Task 完成前調用,會拋出 InvalidStateError;如果 Task 因異常完成,會重新拋出該異常。
  • task.exception(): 獲取 Task 完成時的異常。如果正常完成或未完成,返回 None
  • task.cancel(): 請求取消 Task。Task 內部會收到一個 asyncio.CancelledError 異常。Task 需要自己處理這個異常來實現優雅取消。
  • task.cancelled(): 檢查 Task 是否被取消。

7. 協程通用模板#

# 這是一個 Python 協程 (asyncio) 的通用代碼模板

import asyncio
import time
import random # 導入 random 用於模擬不同任務的耗時

# =============================================================================
# 步驟 1: 定義獨立的異步任務 (協程函數)
# 使用 async def 關鍵字定義,表示這是一個協程函數。
# 協程函數內部會使用 await 調用其他可等待對象(如異步 I/O 操作、asyncio 提供的異步工具或其它協程)。
# 這些函數通常包含實際的異步工作負載。
# =============================================================================
async def async_worker_task(task_id: int, simulate_duration: int):
    """
    定義一個異步工作任務的協程函數。

    Args:
        task_id: 任務的唯一標識符。
        simulate_duration: 模擬任務需要花費的秒數(實際是 await asyncio.sleep 的時間)。
    """
    # 任務開始時的打印,通常會看到多個任務的“開始”打印幾乎同時出現
    print(f"[任務 {task_id}] 開始執行, 预计耗時 {simulate_duration} 秒...")

    # 使用 await 來執行一個異步操作。
    # asyncio.sleep() 是一個可等待對象,await 它時,當前協程會暫停,
    # 控制權會交給事件循環,事件循環可以去運行其他準備好的協程。
    # 這模擬了等待一個異步 I/O 操作(如網路響應、資料庫查詢結果等)的過程,期間不阻塞整個線程。
    await asyncio.sleep(simulate_duration)

    # 任務完成時的打印
    print(f"[任務 {task_id}] 執行完畢。")

    # 任務可以返回一個結果
    return f"任務 {task_id} 完成,耗時 {simulate_duration} 秒。"


# =============================================================================
# 步驟 2: 定義主異步函數 (Orchestrator / 入口)
# 這是整個異步程序的入口點,也是一个 async def 函數。
# 它負責創建、組織和調度其他的協程任務。
# 通常會在這裡使用 asyncio.gather 或 asyncio.create_task 來管理多個任務的並發執行。
# =============================================================================
async def main_async_orchestrator():
    """
    主異步函數,負責創建和運行 worker 任務。
    """
    print("主協調器: 異步流程開始")
    start_time = time.time()

    # 準備一些任務的參數
    task_configurations = [
        (1, random.randint(1, 4)), # 任務1,隨機耗時1-4秒
        (2, random.randint(2, 5)), # 任務2,隨機耗時2-5秒
        (3, random.randint(1, 3)), # 任務3,隨機耗時1-3秒
        (4, random.randint(3, 6)), # 任務4,隨機耗時3-6秒
    ]

    # =========================================================================
    # 步驟 3: 創建協程對象
    # 調用 async def 函數會返回一個協程對象,此時協程內部的代碼還沒有開始執行。
    # =========================================================================
    # 創建一個協程對象列表
    coroutine_objects = [
        async_worker_task(task_id, duration)
        for task_id, duration in task_configurations
    ]
    print(f"主協調器: 已創建 {len(coroutine_objects)} 個協程對象。")


    # =========================================================================
    # 步驟 4: 並發執行協程任務
    # 使用 asyncio.gather() 是最常用的方法,它可以並發地運行列表中的所有“可等待對象”
    # (包括協程對象和 Task 對象),並等待它們全部完成。
    # await asyncio.gather(...) 會暫停當前主協程,並將控制權交給事件循環,
    # 事件循環會同時調度 coroutine_objects 中的協程運行。
    # 當所有協程都完成後,gather 返回一個包含所有協程返回值的列表。
    # =========================================================================
    print("主協調器: 使用 asyncio.gather 並發運行任務...")
    # await 是這裡的關鍵,主協程會在這裡等待所有子任務完成
    results = await asyncio.gather(*coroutine_objects)

    print("主協調器: 所有並發任務已完成。")
    print("主協調器: 所有任務的結果如下:")
    for res in results:
        print(f"- {res}")

    # =========================================================================
    # 可選: 演示使用 asyncio.create_task 創建任務而不立即等待
    # 如果你不想等待某個任務,只想讓它在後台運行,可以使用 asyncio.create_task()
    # background_task = asyncio.create_task(some_other_async_task())
    # print("主協調器: 啟動了一個後台任務...")
    # 注意:如果主協程在後台任務完成前結束,後台任務可能會被取消,
    # 如果需要確保後台任務完成,你可能需要在某個地方 await background_task
    # =========================================================================

    end_time = time.time()
    print(f"主協調器: 異步流程結束。總耗時: {end_time - start_time:.2f} 秒")


# =============================================================================
# 步驟 5: 程序的入口點
# 使用標準的 if __name__ == "__main__": 保護塊。
# 在這裡調用 asyncio.run() 來啟動事件循環,並運行頂層的主異步函數。
# asyncio.run() 是 Python 3.7+ 推薦的方式,它會負責創建事件循環、
# 運行傳入的協程直到完成,並最後關閉事件循環。
# =============================================================================
if __name__ == "__main__":
    print("程序開始")
    # 調用 asyncio.run() 來執行我們的主異步函數 main_async_orchestrator
    asyncio.run(main_async_orchestrator())
    print("程序已完全退出。")
程序開始
主協調器: 異步流程開始
主協調器: 已創建 4 個協程對象。
主協調器: 使用 asyncio.gather 並發運行任務...
[任務 1] 開始執行, 预计耗時 2 秒...
[任務 2] 開始執行, 预计耗時 5 秒...
[任務 3] 開始執行, 预计耗時 2 秒...
[任務 4] 開始執行, 预计耗時 5 秒...
[任務 1] 執行完畢。
[任務 3] 執行完畢。
[任務 2] 執行完畢。
[任務 4] 執行完畢。
主協調器: 所有並發任務已完成。
主協調器: 所有任務的結果如下: 
- 任務 1 完成,耗時 2 秒。    
- 任務 2 完成,耗時 5 秒。
- 任務 3 完成,耗時 2 秒。
- 任務 4 完成,耗時 5 秒。
主協調器: 異步流程結束。總耗時: 5.00 秒
程序已完全退出。

此文由 Mix Space 同步更新至 xLog
原始鏈接為 https://blog.kanes.top/posts/default/understanding-python-multithreading-multiprocessing-coroutines


載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。