ZeroMQ 実践#
ZeroMQは、高性能の非同期メッセージライブラリまたは並行フレームワークです。複雑な低レベルのネットワーク通信の詳細を抽象化し、複雑な分散アプリケーションの構築を簡単にするための柔軟なメッセージパターンのセットを提供します。
ZeroMQ の核心理念:パターン、抽象化、パフォーマンス#
従来の中央集権的なメッセージブローカーとは異なり、ZeroMQ はより分散型(ブローカーなし)または分散の設計理念を提唱しています(もちろん、それに基づいてブローカーを構築することも可能です)。その核心的な利点は次のとおりです:
-
メッセージパターン(Messaging Patterns): ZeroMQ は、ユーザーが原始的なソケットを直接操作して接続、送信、受信、エラーハンドリング、再試行などの詳細に頭を悩ませることを避けます。代わりに、いくつかの古典的で即使用可能なメッセージパターンを提供します。各パターンには特定の通信ロジックと拡張性が組み込まれており、シナリオに適したパターンを選択するだけで済みます。
- リクエスト / レスポンス(REQ/REP): 古典的なクライアント / サーバーモデル。REQ がリクエストを送信し、REP がリクエストを受信して応答します。シンプルで直感的です。
- 公開 / 購読(PUB/SUB): データ配信モデル。PUB が特定のトピックにメッセージを公開し、複数の SUB が興味のあるトピックを購読してメッセージを受信します。一対多のブロードキャストを実現します。
- プッシュ / プル(PUSH/PULL): 作業の配布と収集モデル。PUSH がタスクを複数の PULL ワーカーにプッシュし、PULL ワーカーがタスクを引き出して実行します。多対多の負荷分散と結果収集を実現します。
- ペア(PAIR): 最もシンプルなポイントツーポイントモデル。組み込みのパターンロジックはなく、通常は固定の一対一の接続に使用されます。これらのパターンは ZeroMQ の本質であり、原始的なソケットよりもはるかに高い抽象レベルを提供します。
-
ソケットの強化(Sockets on Steroids): ZeroMQ のソケットは従来のソケットとは異なります。これらはパターンのエンドポイントです。ユーザーは、接続の確立、切断、メッセージキューの管理、エラーハンドリングなどの低レベルの詳細を気にする必要はありません。ZeroMQ が内部でそれを処理します。指定されたアドレスに
bind
またはconnect
し、その後send
またはrecv
でメッセージを送信します。 -
高性能とスケーラビリティ: ZeroMQ は、設計当初からパフォーマンスを考慮しています。非同期 I/O、スマートなメッセージバッチ処理とルーティングを使用し、従来のメッセージキューのボトルネックを回避します。その分散型の特性は、中心的なブローカーの単一障害点の圧力と故障リスクがないことも意味します(ブローカーを構築することを選択しない限り)。
ZeroMQ の核心コンポーネント:Context、Socket、Poller#
-
Context(コンテキスト):
- ZeroMQ の実行環境の管理者であり、リソースの割り当てと管理を担当します。これには、低レベルの I/O スレッドが含まれます。
- コンテキストは ZeroMQ の工場として理解できます。すべてのソケットはコンテキストを通じて作成する必要があります(
context.socket(...)
)。 - 通常、アプリケーションまたはスレッドは 1 つのコンテキストを作成するだけで済みます。
-
Socket(ソケット):
- ZeroMQ でメッセージを送受信するための主要なオブジェクトです。
- 各ソケットには特定のタイプ(例:
zmq.REQ
、zmq.PUB
など)があり、このタイプがソケットが従うメッセージパターンを決定します。 - ソケットは特定のアドレスに ** バインド(bind)** できます(通常はサーバー側)し、接続をリッスンします。
- ソケットは特定のアドレスに ** 接続(connect)** できます(通常はクライアント側)し、接続を開始します。
send()
およびrecv()
メソッドを使用してメッセージを送受信します。ZeroMQ のメッセージは ** バイト列(bytes)** であり、send_string()
/recv_string()
は便利な文字列処理を提供します。メッセージは複数のフレームで構成できます(send_multipart()
/recv_multipart()
)。- 注意: デフォルトでは、
socket.send()
およびsocket.recv()
はブロッキングです。バッファが満杯またはメッセージがない場合、現在のスレッドの実行が一時停止します。
-
Poller(ポーラー):
- 従来の同期(ブロッキング)ZeroMQ プログラミングでは、複数のソケットのメッセージを同時にリッスンする必要がある場合、1 つのソケットで
recv()
を呼び出すとブロックされ、他のソケットのメッセージを処理できなくなります。 zmq.Poller
はこの問題を解決するために使用されます。ポーラーに関心のある複数のソケットとイベント(たとえば、zmq.POLLIN
はメッセージが読み取れることを示します)を登録できます。- その後、
poller.poll(timeout)
メソッドを呼び出します。このメソッドはブロックしますが、すべての登録されたソケットを監視します。関心のあるイベントが発生したソケットがあると、poll()
は戻り、どのソケットが準備できているかを通知します(たとえば、recv()
を呼び出すことができます)。 - 重要: 非同期 ZeroMQ(
zmq.asyncio
)を使用する場合、通常は直接zmq.Poller
を使用する必要はありません。非同期フレームワーク(asyncio イベントループ)が低レベルのイベント監視とスケジューリングを担当します。
- 従来の同期(ブロッキング)ZeroMQ プログラミングでは、複数のソケットのメッセージを同時にリッスンする必要がある場合、1 つのソケットで
ZeroMQ の伝送プロトコル:tcp、ipc、inproc#
-
tcp://
:- TCP/IP プロトコルに基づいています。
- プロセス間またはマシン間のネットワーク通信に使用されます。
- アドレス形式:
tcp://host:port
(例:tcp://127.0.0.1:5555
またはtcp://*:5555
)。
-
ipc://
:- ローカルの ** プロセス間通信(IPC)** メカニズム(例:Unix ドメインソケットまたは Windows 命名パイプ)に基づいています。
- 同一マシンの異なるプロセス間の通信に使用されます。
- 通常、
tcp://
よりも速いです。 - アドレス形式:
ipc://pathname
(例:ipc:///tmp/my_socket
)。
-
inproc://
:- プロセス内メモリ転送に基づいています。
- 同一オペレーティングシステムプロセス内の異なるスレッドまたはコルーチン間の通信にのみ使用できます。
- 非常に高速で、ネットワークオーバーヘッドがありません。
- 非常に重要: あるプロセスで
bind
されたinproc://
アドレスは、他のプロセスからは完全に見えず、接続できません。 - アドレス形式:
inproc://transport_name
(例:inproc://my_internal_channel
)。
- コード例
# zmq_server.py - ZeroMQリクエスト応答モードのサーバー側
import zmq
import time
# 1. ZeroMQ Contextオブジェクトを作成
# ContextはZeroMQ実行環境の管理者
context = zmq.Context()
# 2. REP(Reply)ソケットを作成
# REPソケットはリクエストを受信し、応答を送信するために使用されます
socket = context.socket(zmq.REP)
# 3. ソケットをアドレスにバインド
# "tcp://*:5555"はTCPプロトコルを使用し、すべての利用可能なネットワークインターフェースの5555ポートにバインドします
# "*"はすべてのローカルアドレスにバインドし、クライアントが接続しやすくします
bind_address = "tcp://localhost:5555"
socket.bind(bind_address)
print(f"ZeroMQ REPサーバーが起動しました。バインド先: {bind_address}")
print("リクエストの受信を待っています...")
try:
# サーバーは通常無限ループで実行され、リクエストを受信して処理し続けます
while True:
# 4. リクエストを受信
# socket.recv_string()はブロックし、文字列メッセージを受信するまで待機します
message = socket.recv_string()
print(f"リクエストを受信: '{message}'")
# リクエスト処理のシミュレーション
time.sleep(1) # サーバーが処理に少し時間がかかると仮定します
# 5. 応答メッセージを準備
reply_message = f"サーバーがあなたのメッセージを受け取りました: '{message}'"
# 6. 応答を送信
# socket.send_string()は文字列応答を送信します
# REP-REQモードでは、REPソケットは応答を送信する前にリクエストを受信する必要があります
socket.send_string(reply_message)
print(f"応答を送信: '{reply_message}'")
except KeyboardInterrupt:
print("\nCtrl+Cが検出されました。サーバーをシャットダウンしています...")
finally:
# ZeroMQリソースをクリーンアップ
socket.close()
context.term()
print("サーバーが安全にシャットダウンしました。")
# zmq_client.py - ZeroMQリクエスト応答モードのクライアント
import zmq
# 1. ZeroMQ Contextオブジェクトを作成
context = zmq.Context()
# 2. REQ(Request)ソケットを作成
# REQソケットはリクエストを送信し、応答を受信するために使用されます
socket = context.socket(zmq.REQ)
# 3. サーバーのアドレスに接続
# "tcp://localhost:5555"はTCPプロトコルを使用し、ローカルの5555ポートに接続します
# サーバーが別のマシンにある場合は、'localhost'をサーバーの実際のIPアドレスに置き換えてください
connect_address = "tcp://localhost:5555"
socket.connect(connect_address)
print(f"ZeroMQ REQクライアントが起動しました。接続先: {connect_address}")
print("メッセージをサーバーに送信できます。'quit'と入力して終了します。")
try:
# クライアントは通常ループ内で実行され、複数のメッセージを送信できます
while True:
# ユーザー入力を取得
user_input = input("メッセージを入力してください: ")
# 'quit'が入力されたか確認
if user_input.lower() == 'quit':
break
# 4. リクエストを送信
# socket.send_string()はユーザー入力の文字列メッセージを送信します
# REQ-REPモードでは、REQソケットはリクエストを送信した後、応答を待つ必要があり、連続してリクエストを送信できません
print(f"リクエストを送信: '{user_input}'")
socket.send_string(user_input)
# 5. 応答を受信
# socket.recv_string()はブロックし、サーバーの応答メッセージを受信するまで待機します
reply_message = socket.recv_string()
print(f"応答を受信: '{reply_message}'")
print("-" * 20) # 区切り線
except KeyboardInterrupt:
print("\nCtrl+Cが検出されました。クライアントをシャットダウンしています...")
finally:
# ZeroMQリソースをクリーンアップ
socket.close()
context.term()
print("クライアントが安全にシャットダウンしました。")
非同期:zmq.asyncio#
デフォルトの ZeroMQ ソケットはブロッキングです。Python アプリケーションがasyncio
に基づいて構築されている場合、コルーチン内でブロッキングのsocket.recv()
またはsocket.send()
を呼び出すと、全体のイベントループが一時停止し、他のすべてのコルーチンが実行できなくなり、非同期の利点が失われます。
zmq.asyncio
サブモジュールはこの問題を解決するために生まれました。これは、ZeroMQ ソケットの非同期バージョンを提供し、そのsend()
およびrecv()
メソッドは ** 待機可能(awaitable)** に変わります。
zmq.asyncio
の使用:
zmq.asyncio
をインポートし、通常はエイリアスazmq
を使用します:import zmq.asyncio as azmq
。- 非同期コンテキストを作成します:
context = azmq.Context()
。このコンテキストは、現在のasyncio
イベントループを自動的に感知し、統合します。 - ソケットを作成します:
socket = context.socket(socket_type)
。このコンテキストから作成されたソケットは非同期特性を持ちます。 - コルーチン内で
await
を使用して非同期ソケットメソッドを呼び出します:await socket.send(...)
,await socket.recv(...)
。 - コルーチンが非同期ソケット操作を
await
すると、その操作がすぐに完了しない場合(たとえば、メッセージが受信されていない場合)、現在のコルーチンは一時停止し、制御をasyncio
イベントループに譲渡します。これにより、イベントループは他の準備が整ったコルーチンを実行できます。ソケット操作が完了すると、イベントループは通知し、そのコルーチンを再開します。
非同期(Asyncio)ソケットの例(簡易版 inproc 通信):
# inproc_asyncio_example.py - 同一プロセス内でinproc://伝送プロトコルを使用
import asyncio
import zmq
import zmq.asyncio as azmq # 非同期バージョンを使用
# inprocアドレスを定義
INPROC_ADDRESS = "inproc://my_async_channel"
# 非同期REPワーカーコルーチン(同一プロセス内で実行)
async def async_rep_worker(context: azmq.Context):
# 渡された非同期コンテキストからREPソケットを作成
socket = context.socket(zmq.REP)
# inprocアドレスにバインド
socket.bind(INPROC_ADDRESS)
print(f"REPワーカー(プロセス内)が起動しました。バインド先: {INPROC_ADDRESS}")
try:
while True:
# 非同期でリクエストを受信
message = await socket.recv_string()
print(f"REPワーカー(プロセス内)がリクエストを受信: '{message}'")
# 処理のシミュレーション
await asyncio.sleep(0.5)
reply = f"REPワーカー(プロセス内)が受信し処理しました: '{message}'"
# 非同期で応答を送信
await socket.send_string(reply)
print(f"REPワーカー(プロセス内)が応答を送信: '{reply}'")
except asyncio.CancelledError:
print("\nREPワーカー(プロセス内)がキャンセルされました。終了しています...")
finally:
socket.close()
print("REPワーカー(プロセス内)のソケットが閉じられました。")
# 非同期REQクライアントコルーチン(同一プロセス内で実行)
async def async_req_client(context: azmq.Context):
# 渡された非同期コンテキストからREQソケットを作成
socket = context.socket(zmq.REQ)
# inprocアドレスに接続(注意:このアドレスは同一プロセス内で既にバインドされている必要があります)
socket.connect(INPROC_ADDRESS)
print(f"REQクライアント(プロセス内)が起動しました。接続先: {INPROC_ADDRESS}")
try:
for i in range(3):
request = f"プロセス内リクエスト {i+1}"
print(f"REQクライアント(プロセス内)がリクエストを送信: '{request}'")
# 非同期でリクエストを送信
await socket.send_string(request)
# 非同期で応答を受信
reply = await socket.recv_string()
print(f"REQクライアント(プロセス内)が応答を受信: '{reply}'")
await asyncio.sleep(0.1) # 次のリクエストを送信する前に少し待機
finally:
socket.close()
print("REQクライアント(プロセス内)のソケットが閉じられました。")
# メイン非同期関数、ワーカーとクライアントコルーチンを起動および管理
async def main():
# メイン関数内で唯一の非同期コンテキストを作成
# このコンテキストは、inproc通信を介してすべてのソケットを作成するために使用されます
context = azmq.Context()
print("メインプログラム: Asyncioコンテキストが作成されました")
# asyncio.create_taskを使用してワーカーとクライアントコルーチンを起動
# これらは同一のイベントループ、同一プロセス内で並行して実行されます
worker_task = asyncio.create_task(async_rep_worker(context))
client_task = asyncio.create_task(async_req_client(context))
# クライアントタスクがリクエストを完了するのを待機
await client_task
print("メインプログラム: クライアントタスクが完了しました。")
# クライアントタスクが完了した後、ワーカータスクをキャンセルして終了します
worker_task.cancel()
try:
await worker_task # ワーカータスクがキャンセル信号に応答するのを待機
except asyncio.CancelledError:
print("メインプログラム: ワーカータスクがキャンセルされました。")
# コンテキストのクリーンアップは通常asyncio.run()が担当しますが、手動でcontext.term()を呼び出すこともできます
# context.term() # 手動でループを管理する場合は、termを呼び出す必要があります
# プログラムのエントリーポイント
if __name__ == "__main__":
print("--- プロセス内ZeroMQ(inproc)例が起動しました ---")
# asyncio.runを使用してメイン非同期関数を実行
# これにより、イベントループが起動し、同一プロセス内でworker_taskとclient_taskがスケジュールされます
asyncio.run(main())
print("--- プロセス内ZeroMQ(inproc)例が終了しました ---")
ZeroMQ の適用シーン#
- マイクロサービス間の通信を構築: 柔軟なメッセージルーティングと効率的な転送を提供します。
- 分散タスクキュー: PUSH/PULL モードを使用してタスクをワーカー集団に配布します。
- データの公開と購読システム: PUB/SUB モードを使用してデータを複数の消費者に効率的にブロードキャストします。
- 高性能なデータパイプライン: 異なるアプリケーションコンポーネント間で大量のメッセージを迅速に転送します。
- 複雑な原始ソケットプログラミングの代替: 多対多、一対多などの複雑な通信トポロジーが必要な場合、ZMQ のパターンはコードを大幅に簡素化します。
- 高性能が必要だが、重量級のブローカーを導入したくないシーン。
この記事は Mix Space によって xLog に同期更新されました。原始リンクは https://blog.kanes.top/posts/default/zeromq-practice