ZeroMQ 実践#
ZeroMQは、高性能の非同期メッセージライブラリまたは並行フレームワークです。複雑な低レベルのネットワーク通信の詳細を抽象化し、複雑な分散アプリケーションの構築を簡単にするための柔軟なメッセージパターンのセットを提供します。
ZeroMQ の核心理念:パターン、抽象化、パフォーマンス#
従来の中央集権的なメッセージブローカーとは異なり、ZeroMQ はより分散型(ブローカーなし)または分散の設計理念を提唱しています(もちろん、それに基づいてブローカーを構築することも可能です)。その核心的な利点は次のとおりです:
-
メッセージパターン(Messaging Patterns): ZeroMQ は、ユーザーが原始的なソケットを直接操作して接続、送信、受信、エラーハンドリング、再試行などの詳細に頭を悩ませることを避けます。代わりに、いくつかの古典的で即使用可能なメッセージパターンを提供します。各パターンには特定の通信ロジックと拡張性が組み込まれており、シナリオに適したパターンを選択するだけで済みます。
- リクエスト / レスポンス(REQ/REP): 古典的なクライアント / サーバーモデル。REQ がリクエストを送信し、REP がリクエストを受信して応答します。シンプルで直感的です。
- 公開 / 購読(PUB/SUB): データ配信モデル。PUB が特定のトピックにメッセージを公開し、複数の SUB が興味のあるトピックを購読してメッセージを受信します。一対多のブロードキャストを実現します。
- プッシュ / プル(PUSH/PULL): 作業の配布と収集モデル。PUSH がタスクを複数の PULL ワーカーにプッシュし、PULL ワーカーがタスクを引き出して実行します。多対多の負荷分散と結果収集を実現します。
- ペア(PAIR): 最もシンプルなポイントツーポイントモデル。組み込みのパターンロジックはなく、通常は固定の一対一の接続に使用されます。これらのパターンは ZeroMQ の本質であり、原始的なソケットよりもはるかに高い抽象レベルを提供します。
-
ソケットの強化(Sockets on Steroids): ZeroMQ のソケットは従来のソケットとは異なります。これらはパターンのエンドポイントです。ユーザーは、接続の確立、切断、メッセージキューの管理、エラーハンドリングなどの低レベルの詳細を気にする必要はありません。ZeroMQ が内部でそれを処理します。指定されたアドレスに
bindまたはconnectし、その後sendまたはrecvでメッセージを送信します。 -
高性能とスケーラビリティ: ZeroMQ は、設計当初からパフォーマンスを考慮しています。非同期 I/O、スマートなメッセージバッチ処理とルーティングを使用し、従来のメッセージキューのボトルネックを回避します。その分散型の特性は、中心的なブローカーの単一障害点の圧力と故障リスクがないことも意味します(ブローカーを構築することを選択しない限り)。
ZeroMQ の核心コンポーネント:Context、Socket、Poller#
-
Context(コンテキスト):
- ZeroMQ の実行環境の管理者であり、リソースの割り当てと管理を担当します。これには、低レベルの I/O スレッドが含まれます。
- コンテキストは ZeroMQ の工場として理解できます。すべてのソケットはコンテキストを通じて作成する必要があります(
context.socket(...))。 - 通常、アプリケーションまたはスレッドは 1 つのコンテキストを作成するだけで済みます。
-
Socket(ソケット):
- ZeroMQ でメッセージを送受信するための主要なオブジェクトです。
- 各ソケットには特定のタイプ(例:
zmq.REQ、zmq.PUBなど)があり、このタイプがソケットが従うメッセージパターンを決定します。 - ソケットは特定のアドレスに ** バインド(bind)** できます(通常はサーバー側)し、接続をリッスンします。
- ソケットは特定のアドレスに ** 接続(connect)** できます(通常はクライアント側)し、接続を開始します。
send()およびrecv()メソッドを使用してメッセージを送受信します。ZeroMQ のメッセージは ** バイト列(bytes)** であり、send_string()/recv_string()は便利な文字列処理を提供します。メッセージは複数のフレームで構成できます(send_multipart()/recv_multipart())。- 注意: デフォルトでは、
socket.send()およびsocket.recv()はブロッキングです。バッファが満杯またはメッセージがない場合、現在のスレッドの実行が一時停止します。
-
Poller(ポーラー):
- 従来の同期(ブロッキング)ZeroMQ プログラミングでは、複数のソケットのメッセージを同時にリッスンする必要がある場合、1 つのソケットで
recv()を呼び出すとブロックされ、他のソケットのメッセージを処理できなくなります。 zmq.Pollerはこの問題を解決するために使用されます。ポーラーに関心のある複数のソケットとイベント(たとえば、zmq.POLLINはメッセージが読み取れることを示します)を登録できます。- その後、
poller.poll(timeout)メソッドを呼び出します。このメソッドはブロックしますが、すべての登録されたソケットを監視します。関心のあるイベントが発生したソケットがあると、poll()は戻り、どのソケットが準備できているかを通知します(たとえば、recv()を呼び出すことができます)。 - 重要: 非同期 ZeroMQ(
zmq.asyncio)を使用する場合、通常は直接zmq.Pollerを使用する必要はありません。非同期フレームワーク(asyncio イベントループ)が低レベルのイベント監視とスケジューリングを担当します。
- 従来の同期(ブロッキング)ZeroMQ プログラミングでは、複数のソケットのメッセージを同時にリッスンする必要がある場合、1 つのソケットで
ZeroMQ の伝送プロトコル:tcp、ipc、inproc#
-
tcp://:- TCP/IP プロトコルに基づいています。
- プロセス間またはマシン間のネットワーク通信に使用されます。
- アドレス形式:
tcp://host:port(例:tcp://127.0.0.1:5555またはtcp://*:5555)。
-
ipc://:- ローカルの ** プロセス間通信(IPC)** メカニズム(例:Unix ドメインソケットまたは Windows 命名パイプ)に基づいています。
- 同一マシンの異なるプロセス間の通信に使用されます。
- 通常、
tcp://よりも速いです。 - アドレス形式:
ipc://pathname(例:ipc:///tmp/my_socket)。
-
inproc://:- プロセス内メモリ転送に基づいています。
- 同一オペレーティングシステムプロセス内の異なるスレッドまたはコルーチン間の通信にのみ使用できます。
- 非常に高速で、ネットワークオーバーヘッドがありません。
- 非常に重要: あるプロセスで
bindされたinproc://アドレスは、他のプロセスからは完全に見えず、接続できません。 - アドレス形式:
inproc://transport_name(例:inproc://my_internal_channel)。
- コード例

非同期:zmq.asyncio#
デフォルトの ZeroMQ ソケットはブロッキングです。Python アプリケーションがasyncioに基づいて構築されている場合、コルーチン内でブロッキングのsocket.recv()またはsocket.send()を呼び出すと、全体のイベントループが一時停止し、他のすべてのコルーチンが実行できなくなり、非同期の利点が失われます。
zmq.asyncioサブモジュールはこの問題を解決するために生まれました。これは、ZeroMQ ソケットの非同期バージョンを提供し、そのsend()およびrecv()メソッドは ** 待機可能(awaitable)** に変わります。
zmq.asyncioの使用:
zmq.asyncioをインポートし、通常はエイリアスazmqを使用します:import zmq.asyncio as azmq。- 非同期コンテキストを作成します:
context = azmq.Context()。このコンテキストは、現在のasyncioイベントループを自動的に感知し、統合します。 - ソケットを作成します:
socket = context.socket(socket_type)。このコンテキストから作成されたソケットは非同期特性を持ちます。 - コルーチン内で
awaitを使用して非同期ソケットメソッドを呼び出します:await socket.send(...),await socket.recv(...)。 - コルーチンが非同期ソケット操作を
awaitすると、その操作がすぐに完了しない場合(たとえば、メッセージが受信されていない場合)、現在のコルーチンは一時停止し、制御をasyncioイベントループに譲渡します。これにより、イベントループは他の準備が整ったコルーチンを実行できます。ソケット操作が完了すると、イベントループは通知し、そのコルーチンを再開します。
非同期(Asyncio)ソケットの例(簡易版 inproc 通信):

ZeroMQ の適用シーン#
- マイクロサービス間の通信を構築: 柔軟なメッセージルーティングと効率的な転送を提供します。
- 分散タスクキュー: PUSH/PULL モードを使用してタスクをワーカー集団に配布します。
- データの公開と購読システム: PUB/SUB モードを使用してデータを複数の消費者に効率的にブロードキャストします。
- 高性能なデータパイプライン: 異なるアプリケーションコンポーネント間で大量のメッセージを迅速に転送します。
- 複雑な原始ソケットプログラミングの代替: 多対多、一対多などの複雑な通信トポロジーが必要な場合、ZMQ のパターンはコードを大幅に簡素化します。
- 高性能が必要だが、重量級のブローカーを導入したくないシーン。
この記事は Mix Space によって xLog に同期更新されました。原始リンクは https://blog.kanes.top/posts/default/zeromq-practice