Featured image of post 為基於CeleryExecutor的Airflow增加Worker

為基於CeleryExecutor的Airflow增加Worker

人多勢眾,眾志成城,來幫Airflow多增加幾個幫手吧!

簡介

Celery是一個簡單、靈活且可靠的分佈式任務佇列系統,廣泛用於處理並行任務。在Airflow中,CeleryExecutor是一個受歡迎的選擇,它能夠將任務分佈到多個節點(Workers)上,從而實現分佈式部署。這種架構使Airflow能夠通過增加更多的Worker來增加系統的處理能力,從而有效管理大規模數據管道,並確保任務在指定的時間內完成。使用Docker Compose架設帶有CeleryExecutor的Airflow提供了一個靈活且可管理的解決方案,以應對不同的工作負載。

前置準備

  • 請確保現有的Airflow服務可以被worker訪問。
  • 我們將使Airflow網站提供的docker-compose.yaml文件。通過編輯該配置文件,我們可以為現有的基於CeleryExecutor的Airflow服務添加一個額外的worker。
  • 請確保worker所在的節點可以與主要節點同步dags/資料夾下的內容!若workers節點不存在正確的DAG檔案,將造成任務運作失敗!

編輯設定

移除冗餘組件

由於我們只想為現有的Airflow服務添加一個額外的worker,因此不需要額外的PostgresRedisSchedulerTriggerWebserverFlower組件,所以我們可以將這些組件從 docker-compose.yaml文件中移除。

此外,由於我們不會啟動新的PostgresRedis服務,因此需要移除docker-compose.yaml文件中的depends_on部分,以防止發生錯誤。

設定連線

x-airflow-common - environment部分的環境變數。請填寫必要的資訊,以便worker可以訪問既有的PostgresRedis

1
2
3
4
5
6
7
8
9
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.0}
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@<airflow_ip>:<postgres port>/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@<airflow_ip>:<postgres port>/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@1<airflow_ip>:<redis port>/0

編輯.env

就像我們在前一篇文章中所做的一樣,我們需要在啟動服務前設定AIRFLOW_UID

1
echo -e "AIRFLOW_UID=$(id -u)" > .env

啟動Worker

我們可以通過以下命令啟動worker:

1
docker-compose up -d

檢查 worker 狀態

我們可以檢查worker是否已連接到現有服務。請啟動Flower網頁以查看狀態。如果worker成功連接到現有服務,網頁中應存在兩個worker: Flower網頁

警告

請務必再次確認worker節點是否有與主要節點同步dags/資料夾下的內容,以免造成任務失敗!

comments powered by Disqus
使用 Hugo 建立
主題 StackJimmy 設計