用 Celery 結合 Redis 或 RabbitMQ = 馬上開始使用 Task Queue (1)

Celery: Distributed Task Queue
想在網路上提供一些服務, 利用非對稱式 (asynchronous) 的設計來處理請求 (request) 是一件現在挺熱門的顯學。於是在接收 request 的部分有 Python 的 Tornado 或是 Node.js 之類的 non-blocking server 如雨後春筍般地湧出。

扯遠了,  我這篇不是想講非對稱式的觀念, 而是想要講如果請求中有一些繁複又耗時間的任務 (如新增使用者、寄送電子郵件等等), 因為不能將訪客的連線給拉著不放, 進而導致伺服器無法處理新的請求, 為了避免這樣的情況發生, 我們通常會先將這類的任務送進去 queue 裡面, 啟動常駐的背景程式 (daemon) 來幫我們執行這些任務, 並且將任務的執行過程與結果記錄下來。

如果我們的 Queue (Broker) 想用  Redis 或 RabbitMQ, 那麼 Producer (將任務或訊息加入 Queue 的角色) 及 Consumer (Subscriber、Worker, 將 Queue 中的訊息或任務取出並做相關對應事項的角色) 該用什麼來實作呢?

假如您用 Python 來撰寫專案, 那麼 Celery 應該是您的一個好選擇。


什麼是 Celery?


半年多前剛開始用 Python 寫專案時, 忘記看到了哪篇文章中有人推薦使用 Celery, 於是我就跳進去做了一些小的 PoC, 也順便做了一下 Celery 與其他競爭者的相關測試並回報給了作者, 在決定選用 Celery 後也恰巧在 Taipei.py 的聚會中聽到前東家趨勢科技已在其專案裡面採用 Celery。目前版本編號也已經發展到相當成熟的 3.0.20 了, 我想, 開發者在使用時應該可以比較放心吧。

那麼 Celery 到底是什麼呢? Celery 是一個用 Python 實作的分散式任務佇列 (Distributed Task Queue)。用途在實作上述有提到的 Producer 及 Consumer, 功能上則可以調整 Worker 個數、Worker 擷取執行 Broker 中訊息的速率等等, 細節與設定相當相當多, 您可以參考官方網站裡的介紹, 這裡就先暫時不多花篇幅介紹了。這馬上開始使用 Task Queue系列文章主要是將半年多前的筆記與最近再次用到時的記錄整理一下, 方便自己與大家查找。

用 Celery 的好處是?


由於 Celery 支援 AMQP 等 Protocol 因此在 Queue 的選擇上彈性很大, 無論是直接使用 Redis 或是使用功能強大的 RabbitMQ, 只要稍微修改一下設定檔就可以直接切換、無縫接軌了。

Celery 在執行的速度上也相當快, 根據半年多前使用版本 2.6.0rc4 的記錄, 在一台 MacBook Pro 13", 5400rpm, 2.5GHz Intel Core i5, 4GB 1600 MHz DDR3 SDRAM 上發送 100000 個大小為 128B 的的訊息。得到的結果為:
Producer 可產生約 80 個訊息/秒,
Consumer 則可執行約 380 個訊息/秒。
(測試執行的任務相當簡單, 只有傳遞系統目前時間而已, Producer 與 Consumer 數量皆為 1 個, queue 裡的訊息沒有 persist, 也就是只暫存在記憶體中)
半年多前的小小貢獻

馬上開始


註:文章預設您已經安裝好 Redis 或 RabbitMQ 等 Broker, 範例則是使用 Redis。儘管範例是使用 Redis 當作 Broker, 但是我跟 Celery 的作者一樣, 仍是推薦 RabbitMQ + Celery 的組合。(因為都是 vmware 的產品了, 功能支援相當齊全、測試 HA 的結果也很讓人安心。)

由於範例使用的 Broker 是 Redis, 因此安裝 Celery 只需要執行:

Application


執行 Celery 時需要有 Celery instance。在 Celery 裡頭被稱作 celery application, application, 或是簡稱為 app。有了 app 之後, 就可以在 app 裡面新增 task。

下面簡單的基礎 Celery 範例可以讓您快速地開始使用 Celery, 但是如果要將 Celery 應用在自己的專案內, 有更多的設定需要知道, 在還沒有新文章被緩慢產出前, 可以先參考 Using Celery in Your Application 這篇文章。

新建兩個任務


先新增一個 tasks.py:

啟動執行任務的 worker


執行下列的指令可以啟動一個 worker 來執行任務並啟動 log 的層級設成 debug:

將任務放進 Queue 裡


在自己的程式碼裡 import 剛剛新增的兩個任務, 並將它們加到 Queue 裡面排隊:
完成了! 可以自己玩玩看上面的程式會有什麼樣的結果, 基本這樣簡單的步驟就可以開始用 Queue 了!

更多的細節請等下一篇 -- Tornado 與 Celery 實作 = 馬上開始使用 Task Queue (2)

參考資料:

  • First Steps with Celery
    http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html
  • Calling Tasks
    http://docs.celeryproject.org/en/latest/userguide/calling.html#id1

留言