banner
kanes

kanes

ZeroMQ實踐

ZeroMQ 實踐#

ZeroMQ,是一個高性能的異步消息庫並發框架。將複雜的底層網絡通信細節抽象化,提供了一系列靈活的消息模式,讓構建複雜的分佈式應用變得更簡單。

ZeroMQ 的核心理念:模式、抽象與性能#

與傳統的中心化消息 Broker 不同,ZeroMQ 倡導一種更加去中心化 (brokerless)分佈式的設計理念(當然,也可以基於它構建 Broker)。它的核心優勢在於:

  1. 消息模式 (Messaging Patterns): ZeroMQ 沒有讓用戶直接操作原始的 Socket 去頭疼連接、發送、接收、錯誤處理、重試等細節。相反,它提供了幾種經典的、開箱即用的消息模式。每種模式都內置了特定的通信邏輯和擴展性能力,只需要選擇適合場景的模式即可。

    • 請求 / 應答 (REQ/REP): 經典的客戶端 / 服務器模式。REQ 發送請求,REP 接收請求並回應答。簡單直觀。
    • 發布 / 訂閱 (PUB/SUB): 一種數據分發模式。PUB 向某個主題發布消息,多個 SUB 訂閱感興趣的主題並接收消息。實現一對多廣播。
    • 推 / 拉 (PUSH/PULL): 一種工作分發和收集模式。PUSH 將任務推給多個 PULL Worker,PULL Worker 拉取任務執行。實現多對多負載均衡和結果收集。
    • 成對 (PAIR): 最簡單的點對點模式。沒有內置模式邏輯,通常用於固定的一對一連接。 這些模式是 ZeroMQ 的精髓,它們提供了比原始 Socket 高得多的抽象級別。
  2. Socket 的增強 (Sockets on Steroids): ZeroMQ 的 Socket 和傳統的 Socket 不是一回事。它們是模式的端點。用戶不需要關心底層的連接建立、斷開、消息隊列管理、錯誤處理等,ZeroMQ 在內部幫你搞定了。只需要 bindconnect 到指定的地址,然後 sendrecv 消息。

  3. 高性能與可伸縮性: ZeroMQ 設計之初就考慮了性能。使用異步 I/O,智能的消息批處理和路由,避免許多傳統消息隊列的瓶頸。其去中心化的特性也意味著沒有中心 Broker 的單點壓力和故障風險(除非選擇構建 Broker)。

ZeroMQ 的核心組件:Context、Socket、Poller#

  1. Context (上下文):

    • ZeroMQ 運行時環境的管理者,負責資源的分配和管理,包括底層的 I/O 線程。
    • 可以將 Context 理解為 ZeroMQ 的工廠,所有的 Socket 都必須通過 Context 來創建 (context.socket(...))。
    • 通常一個應用或一個線程只需要創建一個 Context。
  2. Socket (套接字):

    • ZeroMQ 中進行消息傳輸的主要對象
    • 每個 Socket 都有一個特定的類型(如 zmq.REQ, zmq.PUB 等),這個類型決定 Socket 遵循哪種消息模式。
    • Socket 可以綁定 (bind) 到一個地址(通常是服務器端),監聽連接。
    • Socket 可以連接 (connect) 到一個地址(通常是客戶端),發起連接。
    • 透過 send()recv() 方法來發送和接收消息。ZeroMQ 的消息是字節串 (bytes)send_string()/recv_string() 提供方便的字符串處理。消息可以由多個幀組成 (send_multipart()/recv_multipart())。
    • 注意: 默認情況下,socket.send()socket.recv()阻塞的。如果緩衝區滿或沒有消息,它們會暫停當前線程的執行。
  3. Poller (輪詢器):

    • 在傳統的同步 (阻塞) ZeroMQ 編程中,如果需要同時監聽多個 Socket 的消息,直接在一個 Socket 上調用 recv() 會阻塞住,則無法處理其他 Socket 的消息。
    • zmq.Poller 就是用來解決這個問題的。可以向 Poller 註冊多個關心的 Socket 和事件(比如 zmq.POLLIN 表示有消息可讀)。
    • 然後調用 poller.poll(timeout) 方法。這個方法會阻塞,但它同時監控所有註冊的 Socket。一旦有 Socket 發生了關心的事件,poll() 就會返回,告知哪些 Socket 已經準備好了(比如可以調用 recv() 了)。
    • 重要: 在使用異步 ZeroMQ (zmq.asyncio) 時,通常不需要直接使用 zmq.Poller,因為異步框架(asyncio 事件循環)會負責底層的事件監控和調度。

ZeroMQ 的傳輸協議:tcp、ipc、inproc#

  1. tcp://:

    • 基於 TCP/IP 協議。
    • 用於進程間機器間的網絡通信。
    • 地址格式:tcp://host:port (如 tcp://127.0.0.1:5555tcp://*:5555)。
  2. ipc://:

    • 基於本地進程間通信 (IPC) 機制(如 Unix Domain Sockets 或 Windows 命名管道)。
    • 用於同一機器的不同進程間通信。
    • 通常比 tcp:// 更快。
    • 地址格式:ipc://pathname (如 ipc:///tmp/my_socket).
  3. inproc://:

    • 基於進程內內存傳遞
    • 只能用於同一操作系統進程內的不同線程或協程之間通信
    • 速度極快,沒有網絡開銷。
    • 非常重要: 一個進程中 bindinproc:// 地址,在其他進程中是完全不可見且無法連接的。
    • 地址格式:inproc://transport_name (如 inproc://my_internal_channel).
  • 代碼實例
# zmq_server.py - ZeroMQ 請求應答模式的服務器端

import zmq
import time

# 1. 創建 ZeroMQ Context 對象
# Context 是 ZeroMQ 運行時環境的管理者
context = zmq.Context()

# 2. 創建一個 REP (Reply) Socket
# REP Socket 用於接收請求並發送應答
socket = context.socket(zmq.REP)

# 3. 將 Socket 綁定到一個地址
# "tcp://*:5555" 表示使用 TCP 協議,綁定到所有可用網絡接口的 5555 端口
# "*" 表示綁定到所有本地地址,方便客戶端連接
bind_address = "tcp://localhost:5555"
socket.bind(bind_address)

print(f"ZeroMQ REP 服務器已啟動,綁定在 {bind_address}")
print("等待接收請求...")

try:
    # 服務器通常在一個無限循環中運行,不斷接收和處理請求
    while True:
        # 4. 接收請求
        # socket.recv_string() 會阻塞,直到收到一個字符串消息
        message = socket.recv_string()
        print(f"收到請求: '{message}'")

        # 模擬處理請求的過程
        time.sleep(1) # 假裝服務器需要處理一下

        # 5. 準備應答消息
        reply_message = f"服務器收到你的消息: '{message}'"

        # 6. 發送應答
        # socket.send_string() 會發送一個字符串應答
        # 在 REP-REQ 模式中,REP Socket 必須在發送應答前先接收一個請求
        socket.send_string(reply_message)
        print(f"發送應答: '{reply_message}'")

except KeyboardInterrupt:
    print("\n檢測到 Ctrl+C,正在關閉服務器...")
finally:
    # 清理 ZeroMQ 資源
    socket.close()
    context.term()
    print("服務器已安全關閉。")
# zmq_client.py - ZeroMQ 請求應答模式的客戶端

import zmq

# 1. 創建 ZeroMQ Context 對象
context = zmq.Context()

# 2. 創建一個 REQ (Request) Socket
# REQ Socket 用於發送請求並接收應答
socket = context.socket(zmq.REQ)

# 3. 連接到服務器的地址
# "tcp://localhost:5555" 表示使用 TCP 協議,連接到本地的 5555 端口
# 如果服務器在另一台機器上,請將 'localhost' 替換為服務器的實際 IP 地址
connect_address = "tcp://localhost:5555"
socket.connect(connect_address)

print(f"ZeroMQ REQ 客戶端已啟動,連接到 {connect_address}")
print("你可以輸入消息發送給服務器,輸入 'quit' 退出。")

try:
    # 客戶端通常在一個循環中運行,允許發送多條消息
    while True:
        # 獲取用戶輸入
        user_input = input("請輸入消息: ")

        # 檢查是否輸入 'quit'
        if user_input.lower() == 'quit':
            break

        # 4. 發送請求
        # socket.send_string() 會發送用戶輸入的字符串消息
        # 在 REQ-REP 模式中,REQ Socket 必須在發送請求後等待一個應答,不能連續發送請求
        print(f"發送請求: '{user_input}'")
        socket.send_string(user_input)

        # 5. 接收應答
        # socket.recv_string() 會阻塞,直到收到服務器的應答消息
        reply_message = socket.recv_string()
        print(f"收到應答: '{reply_message}'")
        print("-" * 20) # 分隔線

except KeyboardInterrupt:
    print("\n檢測到 Ctrl+C,正在關閉客戶端...")
finally:
    # 清理 ZeroMQ 資源
    socket.close()
    context.term()
    print("客戶端已安全關閉。")

image.png|700x453


異步:zmq.asyncio#

默認的 ZeroMQ Socket 是阻塞的。如果 Python 應用是基於 asyncio 構建的,那麼在一個協程中調用阻塞的 socket.recv()socket.send()暫停整個事件循環,導致其他所有協程都無法運行,異步的優勢蕩然無存。

zmq.asyncio 子模塊就是為了解決這個問題而生的。它提供了 ZeroMQ Socket 的異步版本,其 send()recv() 方法變成了可等待的 (awaitable)

使用 zmq.asyncio

  1. 導入 zmq.asyncio,通常取別名 azmqimport zmq.asyncio as azmq
  2. 創建異步 Context:context = azmq.Context()。這個 Context 會自動感知並集成當前的 asyncio 事件循環。
  3. 創建的 Socket:socket = context.socket(socket_type)。從這個 Context 創建的 Socket 具有異步特性。
  4. 在協程中使用 await 調用異步 Socket 方法:await socket.send(...), await socket.recv(...)
  5. 當一個協程 await 一個異步 Socket 操作時,如果該操作不能立即完成(比如沒有收到消息),當前協程會暫停讓出控制權asyncio 事件循環,允許事件循環去執行其他準備好的協程。當 Socket 操作完成後,事件循環會通知並恢復該協程。

異步 (Asyncio) Socket 示例 (簡版 inproc 通信):

# inproc_asyncio_example.py - 在同一個進程內使用 inproc:// 傳輸協議

import asyncio
import zmq
import zmq.asyncio as azmq # 使用異步版本

# 定義 inproc 地址
INPROC_ADDRESS = "inproc://my_async_channel"

# 異步 REP Worker 協程 (運行在同一個進程內)
async def async_rep_worker(context: azmq.Context):
    # 從傳入的異步 Context 創建 REP Socket
    socket = context.socket(zmq.REP)
    # 綁定到 inproc 地址
    socket.bind(INPROC_ADDRESS)
    print(f"REP Worker (進程內) 已啟動,綁定到 {INPROC_ADDRESS}")

    try:
        while True:
            # 異步接收請求
            message = await socket.recv_string()
            print(f"REP Worker (進程內) 收到請求: '{message}'")

            # 模擬處理
            await asyncio.sleep(0.5)

            reply = f"REP Worker (進程內) 收到並處理了: '{message}'"
            # 異步發送應答
            await socket.send_string(reply)
            print(f"REP Worker (進程內) 發送應答: '{reply}'")

    except asyncio.CancelledError:
        print("\nREP Worker (進程內) 被取消,正在退出...")
    finally:
        socket.close()
        print("REP Worker (進程內) Socket 已關閉。")


# 異步 REQ Client 協程 (運行在同一個進程內)
async def async_req_client(context: azmq.Context):
     # 從傳入的異步 Context 創建 REQ Socket
    socket = context.socket(zmq.REQ)
    # 連接到 inproc 地址 (注意:這個地址必須在同一個進程內已經被某個 socket 綁定了)
    socket.connect(INPROC_ADDRESS)
    print(f"REQ Client (進程內) 已啟動,連接到 {INPROC_ADDRESS}")

    try:
        for i in range(3):
            request = f"進程內請求 {i+1}"
            print(f"REQ Client (進程內) 發送請求: '{request}'")
            # 異步發送請求
            await socket.send_string(request)

            # 異步接收應答
            reply = await socket.recv_string()
            print(f"REQ Client (進程內) 收到應答: '{reply}'")
            await asyncio.sleep(0.1) # 稍微等一下再發下個請求
            
    finally:
        socket.close()
        print("REQ Client (進程內) Socket 已關閉。")

# 主異步函數,啟動和管理 Worker 和 Client 協程
async def main():
    # 在主函數中創建唯一一個異步 Context
    # 這個 Context 將用於創建所有需要通過 inproc 通信的 Socket
    context = azmq.Context()
    print("主程序: Asyncio Context 已創建")

    # 使用 asyncio.create_task 啟動 Worker 和 Client 協程
    # 它們將在同一個事件循環、同一個進程內並發運行
    worker_task = asyncio.create_task(async_rep_worker(context))
    client_task = asyncio.create_task(async_req_client(context))

    # 等待 Client 任務完成它的請求
    await client_task
    print("主程序: Client 任務完成。")

    # Client 任務完成後,取消 Worker 任務以退出
    worker_task.cancel()
    try:
        await worker_task # 等待 Worker 任務響應取消信號
    except asyncio.CancelledError:
        print("主程序: Worker 任務已被取消。")

    # Context 的清理通常由 asyncio.run() 負責,或者可以手動 context.term()
    # context.term() # 如果手動管理循環,需要 term

# 程序的入口點
if __name__ == "__main__":
    print("--- 進程內 ZeroMQ (inproc) 示例啟動 ---")
    # 使用 asyncio.run 運行主異步函數
    # 這將啟動事件循環,並在同一個進程中調度 worker_task 和 client_task
    asyncio.run(main())
    print("--- 進程內 ZeroMQ (inproc) 示例結束 ---")

image.png|0x0

ZeroMQ 的適用場景#

  • 構建微服務之間的通信: 提供靈活的消息路由和高效傳輸。
  • 分佈式任務隊列: 使用 PUSH/PULL 模式分發任務給 Worker 集群。
  • 數據發布與訂閱系統: 使用 PUB/SUB 模式高效廣播數據給多個消費者。
  • 高性能的數據管道: 在不同應用組件間快速傳遞大量消息。
  • 替代複雜的原始 Socket 編程: 當需要多對多、一對多等複雜通信拓撲時,ZMQ 的模式能大大簡化代碼。
  • 需要高性能但又不想引入重量級 Broker 的場景。

此文由 Mix Space 同步更新至 xLog
原始鏈接為 https://blog.kanes.top/posts/default/zeromq-practice


載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。