banner
kanes

kanes

Pythonのマルチスレッド、マルチプロセス、コルーチンの基礎から応用まで

  1. マルチプロセッシング (Multiprocessing): オペレーティングシステムレベルの並行性。各プロセスは独自のメモリ空間を持ち、プロセス間で影響を与えない。CPU 集中型 タスクの実行に適しており、マルチコア CPU を十分に活用できる。プロセスの作成と破棄にはコストがかかる。
  2. マルチスレッド (Multithreading): 同じプロセス内で複数の実行フローを作成する。スレッドはプロセスのメモリ空間を共有する。I/O 集中型 タスク(ネットワークリクエスト、ファイルの読み書きなど)の実行に適している。I/O を待っている間、他のスレッドが引き続き実行できる。しかし、Python では GIL (Global Interpreter Lock) の制約を受けており、同時に実行できるのは 1 つのスレッドだけなので、CPU 集中型タスクではマルチスレッドによる真の並行性は実現できない。スレッドの作成と破棄のコストはプロセスよりも小さい。
  3. コルーチン (Coroutines): ユーザースペースの協調型マルチタスク。コルーチンは軽量で、プログラム自身が切り替えを制御し、オペレーティングシステムによってスケジュールされるのではない。これらは 1 つのスレッド内で実行され、await または yield from によって制御を積極的に放棄し、他のコルーチンが実行できるようにする。特に大量の I/O 集中型 タスクの処理に適しており、切り替えコストが非常に小さい。コルーチンはマルチコア CPU を利用して並行計算を行うことはできない。

1. マルチプロセッシング (Multiprocessing)#

multiprocessing モジュールを使用します。注意点として、Windows 上でマルチプロセスコードを実行する際は、通常、メインロジックを if __name__ == "__main__": ブロック内に置く必要があります。

import multiprocessing
import time
import os

def process_task(name, duration):
    """時間のかかるタスクをシミュレート"""
    print(f"プロセス {name} (PID: {os.getpid()}) が開始、所要時間 {duration} 秒...")
    time.sleep(duration)
    print(f"プロセス {name} (PID: {os.getpid()}) が終了")

if __name__ == "__main__": # Windows ではこの保護が必要
    print("--- マルチプロセスの例を開始 ---")
    start_time = time.time()

    # 2つのプロセスを作成
    p1 = multiprocessing.Process(target=process_task, args=("プロセスA", 2))
    p2 = multiprocessing.Process(target=process_task, args=("プロセスB", 3))

    # プロセスを開始
    p1.start()
    p2.start()

    # すべてのプロセスが完了するのを待つ
    p1.join()
    p2.join()

    end_time = time.time()
    print(f"--- マルチプロセスの例が終了、総所要時間: {end_time - start_time:.2f} 秒 ---")
    # 理論的には、CPU がマルチコアで、2つのタスクがどちらも CPU 集中型の場合、総所要時間は max(2, 3)=3秒に近くなる
    # しかし、ここでは sleep を使って I/O をシミュレートしているため、総所要時間は依然として max(2, 3)=3秒に近くなるが、重要なのはそれらが並行して実行されていること

実行結果の分析:
プロセス A とプロセス B はほぼ同時に開始し、PID は異なる総所要時間は 2 つのタスクのうち最も長いものに近い。これは、それらが並行して実行され、それぞれ独立したプロセスで動作していることを示しています。


2. マルチスレッド (Multithreading)#

threading モジュールを使用します。スレッドは同じプロセス内で実行され、メモリを共有します。

import threading
import time
import os

def thread_task(name, duration):
    """時間のかかるタスクをシミュレート"""
    # 注意:ここでは I/O 集中型タスク(待機)をシミュレートしています
    print(f"スレッド {name} (PID: {os.getpid()}) が開始、所要時間 {duration} 秒...")
    time.sleep(duration)
    print(f"スレッド {name} (PID: {os.getpid()}) が終了")

if __name__ == "__main__": # threading は強制ではありませんが、慣習的にここに置きます
    print("--- マルチスレッドの例を開始 ---")
    start_time = time.time()

    # 2つのスレッドを作成
    t1 = threading.Thread(target=thread_task, args=("スレッドA", 2))
    t2 = threading.Thread(target=thread_task, args=("スレッドB", 3))

    # スレッドを開始
    t1.start()
    t2.start()

    # すべてのスレッドが完了するのを待つ
    t1.join()
    t2.join()

    end_time = time.time()
    print(f"--- マルチスレッドの例が終了、総所要時間: {end_time - start_time:.2f} 秒 ---")
    # I/O をシミュレートしているため(sleep は GIL を解放します)、総所要時間は max(2, 3)=3秒に近くなります。
    # もし純粋な CPU 計算タスクであれば、GIL の影響を受け、総所要時間は 2+3=5秒に近くなるでしょう。

実行結果の分析:
スレッド A とスレッド B はほぼ同時に開始しますが、PID は同じです。なぜなら、同じプロセス内で実行されているからです。総所要時間は 2 つのタスクのうち最も長いものに近くなります。これは time.sleep() が待機中に GIL を解放し、他のスレッドが実行できるようにするためです。


3. コルーチン (Coroutines) - asyncio を使用#

asyncio モジュールを使用します。コルーチンは単一スレッドで、イベントループによってスケジュールされます。await が重要で、これは現在のコルーチンが実行を一時停止し、制御をイベントループに返し、他の準備が整ったコルーチンを実行できるようにします。

import asyncio
import time
import os

async def async_task(name, duration):
    """非同期の時間のかかるタスクをシミュレート"""
    # 注意:ここでは必ず await asyncio.sleep() を使用して非同期待機をシミュレートする必要があります
    print(f"コルーチン {name} (PID: {os.getpid()}) が開始、所要時間 {duration} 秒...")
    await asyncio.sleep(duration) # <-- 重要:コルーチンはここで制御を放棄します
    print(f"コルーチン {name} (PID: {os.getpid()}) が終了")

async def main():
    """メインコルーチン、他のコルーチンを作成して実行します"""
    print("--- コルーチンの例を開始 ---")
    start_time = time.time()

    # 2つのコルーチンオブジェクトを作成
    task1 = async_task("コルーチンA", 2)
    task2 = async_task("コルーチンB", 3)

    # asyncio.gather を使用してコルーチンを並行して実行し、完了を待つ
    await asyncio.gather(task1, task2)

    end_time = time.time()
    print(f"--- コルーチンの例が終了、総所要時間: {end_time - start_time:.2f} 秒 ---")
    # 総所要時間は max(2, 3)=3秒に近くなります。なぜなら、待機中に切り替えが可能だからです。
    
if __name__ == "__main__":
    asyncio.run(main()) # イベントループを起動し、メインコルーチンを実行します

実行結果の分析:
コルーチン A とコルーチン B はほぼ同時に開始し、PID も同じです(同じプロセスの同じスレッド内で実行されているため)。総所要時間は 2 つのタスクのうち最も長いものに近くなります。これは、コルーチン A が await asyncio.sleep(2) に到達したとき、制御をイベントループに返し、イベントループがコルーチン B を実行する準備ができていることを示しています。コルーチン B も await asyncio.sleep(3) に到達したとき、同様に制御を放棄します。イベントループはどちらのコルーチンが最初に待機を完了するかを待ち、その後実行を再開します。


4. まとめと比較#

特性マルチプロセッシング (multiprocessing)マルチスレッド (threading)コルーチン (asyncio)
並行 / 並列並列 (Parallelism) - 真のマルチコア利用並行 (Concurrency) - I/O 集中型の効率が高く、CPU 集中型は GIL に制約される並行 (Concurrency) - 協調型、単一スレッド内での切り替え
リソース消費重 (独立したメモリ空間)軽 (共有メモリ空間)最も軽 (ユーザースペース、スタックオーバーヘッドが小さい)
切り替え方式オペレーティングシステムのスケジューリング (プリエンプティブ)オペレーティングシステムのスケジューリング (プリエンプティブ)プログラム制御 (await/yield from) (協調型)
適用シーンCPU 集中型タスク、マルチコアを十分に活用I/O 集中型タスク、単純な並行性のニーズ大量の I/O 集中型タスク、高並行接続
データ共有IPC (パイプ、キューなど) が必要で複雑共有メモリ、ロックなどの同期メカニズムが必要で比較的複雑共有メモリ、通常はパラメータ渡しや共有オブジェクトを通じて、コルーチン安全に注意
エラー処理1 つのプロセスがクラッシュしても他のプロセスには影響しない1 つのスレッドがクラッシュするとプロセス全体がクラッシュする可能性がある1 つのコルーチンの例外は通常自身にのみ影響するが、未捕捉の場合はイベントループに影響を与える可能性がある
Python 制限GIL の影響を受けない (各プロセスに独自のインタプリタがある)GIL の影響を受ける (同時に 1 つのスレッドのみが Python バイトコードを実行)GIL の直接的な影響を受けない (単一スレッド内でのみ実行されるため)、ただし CPU 集中型タスクはイベントループ全体をブロックする

5. コルーチンの async await asyncio#

asyncioawaitasync を書かなかった場合はどうなるか?#

asyncawait、および asyncio はコルーチン協調モデルの構文糖と実行環境であり、欠かせません。

  1. async def を書かなかった場合:

    • 関数を定義したが async def を忘れた場合、それは通常の同期関数になります。
    • 結果:
      • await を使用できない: この関数内で await を使用すると、Python は直接 SyntaxError エラーを報告します。なぜなら、awaitasync def 関数内でのみ使用できるからです。
      • await できない: この通常の関数は呼び出された後、直接結果を返します(または例外をスローします)。それは待機可能なオブジェクトではないため、他の async def 関数内で await することはできません。
    # エラー例:通常の関数内で await を使用
    # def blocking_function(): # async が欠けている
    #     await asyncio.sleep(1) # SyntaxError: 'await' not in async function
    #     print("これは実行できません")
  1. await を書かなかった場合:

    • async def 関数内で、待機可能なオブジェクト(例えば、別のコルーチン関数が返すオブジェクト Workspace_url(...)asyncio.sleep(...))を呼び出したが、await を忘れた場合
    • 結果:
      • 待機しない: 現在のコルーチンはすぐに続行し、待機可能オブジェクトの結果を待たずに実行を続けます。

      • 待機可能オブジェクトが無視される (または警告が発生): Workspace_url(...)asyncio.sleep(...) を呼び出すと、コルーチンオブジェクトが返されますが、前に await がないため、このコルーチンオブジェクトはイベントループにスケジュールされず実行されません。それは作成されたが使用されていない変数のように、静かにそこに横たわり、ガベージコレクションされるまで待っています。これは通常、期待される非同期操作が全く発生しない原因となります。Python インタプリタは、未待機のコルーチンがあることを警告する実行時警告を発する可能性があります。これは asyncio プログラミングにおける非常に一般的なエラーの原因です。

  2. asyncio.run() (または同等のイベントループ起動コード) を書かなかった場合:

    • トップレベルの async def main() 関数を定義したが、単に main() を呼び出した場合。
    • 結果:
      • 非同期コードが実行されない: main() を呼び出すと、単にコルーチンオブジェクトが返されます。このコルーチンオブジェクトにはすべての非同期ロジックのコードが含まれていますが、イベントループに実行を提出されていない。イベントループは非同期コードが実行されるために必要な「エンジン」です。イベントループを起動せず、トップレベルのコルーチンを渡さなければ、async def 関数内のコード(await 呼び出しを含む)は実行されません。
    async def my_app():
        print("私は非同期アプリです")
        await asyncio.sleep(1)
        print("私は実行を終えるはずです")
    
    # エラー例:async 関数を直接呼び出す
    # my_app() # 呼び出すとコルーチンオブジェクトが返されますが、何も印刷されません
    # print("プログラム終了") # この行はすぐに実行されます
    
    # 正しい方法は:
    # asyncio.run(my_app()) # イベントループを起動し、my_app コルーチンを実行します
    
  • async def は関数を非同期としてマークし、その呼び出しがコルーチンオブジェクトを返すようにします。
  • await は非同期関数内で使用され、停止ポイントをマークし、制御をイベントループに返し、待機可能なオブジェクトの完了を待ちます。
  • asyncio (特にイベントループ、asyncio.run や他の方法で起動) はコルーチンを実行するための実行時環境であり、コルーチンオブジェクトを受け取り、それらの実行をスケジュールし、await ポイント間で切り替えます。

6. コルーチンの create_task#

複数のコルーチンを同時に開始するにはどうすればよいでしょうか?1 つの await が完了してから次の await をするのではなく(それでは直列になります)。

asyncio.create_task() がこの問題を解決する重要な関数の 1 つです。

asyncio.create_task(coro) の作用#

  • 機能: 1 つのコルーチンオブジェクト (coro) をラップして、Task オブジェクトにし、現在実行中のイベントループに実行を待機するようにスケジュールします
  • 返り値: asyncio.Task オブジェクトを返します。

Task とは?#

  • asyncio.Taskasyncio が提供するコアオブジェクトの 1 つです。
  • イベントループ内で実行中(または実行予定)のコルーチンのハンドルまたは代表と見なすことができます。
  • Task オブジェクト自体は待機可能です (awaitable)Task オブジェクトを await することで、そのラップされたコルーチンが完了するのを待ち、結果を取得できます。
  • Task オブジェクトは、コルーチンの状態(完了しているか、キャンセルされているか)を確認したり、結果を取得したり、例外を取得したり、コルーチンをキャンセルするためのメソッドを提供します。

asyncio.create_task() と直接 await#

  1. 直接 await でコルーチン呼び出し:

    async def main():
        print("メイン開始")
        await some_async_function() # <-- 現在のメインコルーチンはここで一時停止し、some_async_function の完了を待ちます
        print("メイン終了")
    
    async def some_async_function():
        print("some_async_function 開始")
        await asyncio.sleep(2)
        print("some_async_function 終了")
    
    # 実行順序: メイン開始 -> some_async_function 開始 -> 2秒待機 -> some_async_function 終了 -> メイン終了
    # メイン関数は some_async_function の完了を待機しているため、直列で実行されます。
    

    直接 await することは、現在のコルーチンが待機する必要があることを意味し、待機可能オブジェクトが完了するまで実行を続けることができません。

  2. asyncio.create_task() を使用:

    async def main():
        print("メイン開始")
        # Task を作成し、some_async_function をイベントループにスケジュールします
        task = asyncio.create_task(some_async_function())
        print("some_async_function が Task としてスケジュールされ、メインは続行します")
    
        # この時点でメインコルーチンはすぐに続行し、task の完了を待ちません
        # task はイベントループ内でメインと並行して実行されます
    
        # メイン関数がここで task を await しない場合、すぐに実行が完了します
        # メインが task の完了を確実に待つためには、どこかで await task をする必要があります
        await task # <-- メインはここで task の完了を待ちます
        print("task 完了、メイン終了")
    
    async def some_async_function():
        print("some_async_function 開始")
        await asyncio.sleep(2)
        print("some_async_function 終了")
    
    # 実行順序: メイン開始 -> some_async_function がスケジュールされました... -> メインは続行します -> (イベントループが切り替え) some_async_function 開始 -> (イベントループが切り替え) メインは続行します... -> (イベントループが切り替え) 2秒待機 -> some_async_function 終了 -> (イベントループが切り替え) task 完了、メイン終了
    # メインと task は並行して実行されています。メインは task をスケジュールした後、すぐにブロックされずに続行しました。
    # 最後の await task はメインが task が完了する前に終了しないことを保証します。
    

    asyncio.create_task() は、各 download_page(url) コルーチンを独立して実行できる Task に変えます。await asyncio.gather(*tasks) は、メインコルーチンがすべてのこれらの Task が完了するのを待つように一時停止します。この待機中に、イベントループはこれらの Task の間で切り替えを行い、並行ダウンロードを実現します。

create_task() の典型的な使用シーン#

  1. 「バックグラウンド」タスクの起動: 特定のコルーチンを実行させたいが、完了時期を気にしない、または時々その状態を確認したい場合。例えば、ログ記録コルーチンや監視コルーチンなど。
  2. タスクを独立して管理する必要がある場合: 実行中のタスクをキャンセルする必要があるかもしれません (task.cancel())、またはその完了状態を確認する必要があります (task.done())、または結果を取得する必要があります (task.result())。
  3. asyncio.gather() と組み合わせて使用: asyncio.gather() はコルーチンオブジェクトを直接受け取ることも、Task オブジェクトを受け取ることもできます。コルーチンオブジェクトを直接渡す方が簡潔ですが、タスクオブジェクトを先に作成して何らかの前処理や管理を行う必要がある場合、create_task が非常に便利です。

Task オブジェクトのいくつかのメソッド#

  • task.done(): Task が完了しているかどうかを確認します(正常に完了、例外をスロー、またはキャンセルされたか)。
  • task.result(): Task の結果を取得します。Task が完了する前に呼び出すと InvalidStateError がスローされます。Task が例外で完了した場合、その例外が再スローされます。
  • task.exception(): Task が完了したときの例外を取得します。正常に完了した場合や未完了の場合は None を返します。
  • task.cancel(): Task のキャンセルをリクエストします。Task 内部は asyncio.CancelledError 例外を受け取ります。Task はこの例外を処理して優雅にキャンセルする必要があります。
  • task.cancelled(): Task がキャンセルされたかどうかを確認します。

7. コルーチンの一般的なテンプレート#

# これは Python コルーチン (asyncio) の一般的なコードテンプレートです

import asyncio
import time
import random # 異なるタスクの所要時間をシミュレートするために random をインポート

# =============================================================================
# ステップ 1: 独立した非同期タスク (コルーチン関数) を定義
# async def キーワードを使用して定義し、これはコルーチン関数であることを示します。
# コルーチン関数内では await を使用して他の待機可能オブジェクト(非同期 I/O 操作、asyncio が提供する非同期ツール、または他のコルーチン)を呼び出します。
# これらの関数には通常、実際の非同期作業負荷が含まれます。
# =============================================================================
async def async_worker_task(task_id: int, simulate_duration: int):
    """
    非同期作業タスクのコルーチン関数を定義します。

    Args:
        task_id: タスクの一意の識別子。
        simulate_duration: タスクがかかる秒数をシミュレートします(実際には await asyncio.sleep の時間)。
    """
    # タスク開始時の印刷、通常、複数のタスクの「開始」印刷がほぼ同時に表示されます
    print(f"[タスク {task_id}] 実行を開始、予想所要時間 {simulate_duration} 秒...")

    # await を使用して非同期操作を実行します。
    # asyncio.sleep() は待機可能オブジェクトであり、await すると現在のコルーチンは一時停止し、
    # 制御がイベントループに渡され、イベントループは他の準備が整ったコルーチンを実行できます。
    # これは、ネットワーク応答、データベースクエリ結果などの非同期 I/O 操作を待機するプロセスをシミュレートし、その間スレッド全体をブロックしません。
    await asyncio.sleep(simulate_duration)

    # タスク完了時の印刷
    print(f"[タスク {task_id}] 実行完了。")

    # タスクは結果を返すことができます
    return f"タスク {task_id} 完了、所要時間 {simulate_duration} 秒。"


# =============================================================================
# ステップ 2: メイン非同期関数 (オーケストレーター / エントリ)
# これは全体の非同期プログラムのエントリポイントであり、async def 関数でもあります。
# 他のコルーチンタスクを作成、組織、スケジュールする役割を担います。
# 通常、ここで asyncio.gather または asyncio.create_task を使用して複数のタスクの並行実行を管理します。
# =============================================================================
async def main_async_orchestrator():
    """
    メイン非同期関数、worker タスクを作成して実行します。
    """
    print("主オーケストレーター: 非同期プロセス開始")
    start_time = time.time()

    # タスクのパラメータを準備
    task_configurations = [
        (1, random.randint(1, 4)), # タスク1、1-4秒のランダム所要時間
        (2, random.randint(2, 5)), # タスク2、2-5秒のランダム所要時間
        (3, random.randint(1, 3)), # タスク3、1-3秒のランダム所要時間
        (4, random.randint(3, 6)), # タスク4、3-6秒のランダム所要時間
    ]

    # =========================================================================
    # ステップ 3: コルーチンオブジェクトを作成
    # async def 関数を呼び出すとコルーチンオブジェクトが返されます。この時点ではコルーチン内部のコードはまだ実行されていません。
    # =========================================================================
    # コルーチンオブジェクトのリストを作成
    coroutine_objects = [
        async_worker_task(task_id, duration)
        for task_id, duration in task_configurations
    ]
    print(f"主オーケストレーター: {len(coroutine_objects)} 個のコルーチンオブジェクトを作成しました。")


    # =========================================================================
    # ステップ 4: コルーチンタスクを並行して実行
    # asyncio.gather() を使用することが最も一般的な方法であり、リスト内のすべての「待機可能オブジェクト」
    # (コルーチンオブジェクトや Task オブジェクトを含む)を並行して実行し、すべての完了を待ちます。
    # await asyncio.gather(...) は現在のメインコルーチンを一時停止し、制御をイベントループに渡します。
    # イベントループは coroutine_objects 内のコルーチンを同時にスケジュールして実行します。
    # すべてのコルーチンが完了すると、gather はすべてのコルーチンの返り値を含むリストを返します。
    # =========================================================================
    print("主オーケストレーター: asyncio.gather を使用してタスクを並行して実行します...")
    # await はここでの重要な部分であり、メインコルーチンはここでサブタスクがすべて完了するのを待ちます
    results = await asyncio.gather(*coroutine_objects)

    print("主オーケストレーター: すべての並行タスクが完了しました。")
    print("主オーケストレーター: すべてのタスクの結果は以下の通りです:")
    for res in results:
        print(f"- {res}")

    # =========================================================================
    # オプション: asyncio.create_task を使用してタスクを作成し、すぐに待機しない
    # 特定のタスクを待機せずにバックグラウンドで実行させたい場合、asyncio.create_task() を使用できます。
    # background_task = asyncio.create_task(some_other_async_task())
    # print("主オーケストレーター: バックグラウンドタスクを起動しました...")
    # 注意:メインコルーチンがバックグラウンドタスクが完了する前に終了すると、バックグラウンドタスクはキャンセルされる可能性があります。
    # バックグラウンドタスクが完了することを確実にするには、どこかで await background_task をする必要があります。
    # =========================================================================

    end_time = time.time()
    print(f"主オーケストレーター: 非同期プロセス終了。総所要時間: {end_time - start_time:.2f} 秒")


# =============================================================================
# ステップ 5: プログラムのエントリポイント
# 標準の if __name__ == "__main__": 保護ブロックを使用します。
# ここで asyncio.run() を呼び出してイベントループを起動し、トップレベルのメイン非同期関数を実行します。
# asyncio.run() は Python 3.7+ で推奨される方法であり、イベントループを作成し、
# 渡されたコルーチンを完了するまで実行し、最後にイベントループを閉じます。
# =============================================================================
if __name__ == "__main__":
    print("プログラム開始")
    # asyncio.run() を呼び出して私たちのメイン非同期関数 main_async_orchestrator を実行します
    asyncio.run(main_async_orchestrator())
    print("プログラムは完全に終了しました。")
プログラム開始
主オーケストレーター: 非同期プロセス開始
主オーケストレーター: 4 個のコルーチンオブジェクトを作成しました。
主オーケストレーター: asyncio.gather を使用してタスクを並行して実行します...
[タスク 1] 実行を開始、予想所要時間 2 秒...
[タスク 2] 実行を開始、予想所要時間 5 秒...
[タスク 3] 実行を開始、予想所要時間 2 秒...
[タスク 4] 実行を開始、予想所要時間 5 秒...
[タスク 1] 実行完了。
[タスク 3] 実行完了。
[タスク 2] 実行完了。
[タスク 4] 実行完了。
主オーケストレーター: すべての並行タスクが完了しました。
主オーケストレーター: すべてのタスクの結果は以下の通りです: 
- タスク 1 完了、所要時間 2 秒。    
- タスク 2 完了、所要時間 5 秒。
- タスク 3 完了、所要時間 2 秒。
- タスク 4 完了、所要時間 5 秒。
主オーケストレーター: 非同期プロセス終了。総所要時間: 5.00 秒
プログラムは完全に終了しました。

この記事は Mix Space によって xLog に同期更新されています。
元のリンクは https://blog.kanes.top/posts/default/understanding-python-multithreading-multiprocessing-coroutines


読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。