Featured image of post 淺談Airflow自動化與雲端服務的那兩三事

淺談Airflow自動化與雲端服務的那兩三事

原來GCP的開發體驗可以這麼好?

簡介

在網路爬蟲與資料搜集中,通過向既有的API索取資料,並儲存至資料湖,並更近一步的整理至資料倉儲是一個很常見的資料管線。達成這一流程的方法很多,在使用Airflow建構資料管線上,無論是使用PythonOperator自己建構任務,或是使用社群提供的各個Operator都能達成任務。今天我們將來淺談一下,若是在盡量不使用自訂的PythonOperator的前提下,僅使用AWS以及GCPOperator進行資料的索取以及上傳,會需要做些什麼,或是會遇到什麼問題。以下是我們的假設情境示意圖:

假設情境

資料來源

在這次的示範中,我們將使用台灣氣象署的公開API服務,使用上需要申請一組Token,若有興趣嘗試者,請自行至網站註冊並取得Token

AWS

在AWS方面,我們可以使用AWS S3作為資料湖,並使用AWS Redshift作為資料倉儲。

取得資料

在Airflow的工具中,有HttpToS3Operator可以協助我們把向API索取資料,並且上傳至指定S3容器的流程包裝成一個動作。以下是一個範例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from airflow import DAG
from airflow.models import Variable
from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator

with DAG(...):
    token = Variable.get('cwa_auth_token')
    s3_bucket_name = Variable.get('s3-dev-bucket-name')
    get_recent_weather_task = HttpToS3Operator(
        task_id='get_weather_to_s3',
        http_conn_id="cwa_real_time_api",
        endpoint="/api/v1/rest/datastore/O-A0003-001",
        method="GET",
        data={
            'Authorization': f'{token}',
            'format': "JSON",
        },
        headers={"Content-Type": "application/json"},
        log_response=True,
        s3_bucket=f"{s3_bucket_name}",
        s3_key="weather_record/weather_report_10min-{{ execution_date }}_v2.json",
        aws_conn_id="aws_s3_conn",
    )

這部分相當的方便,等於是可以省去先使用HTTPOperator取得資料,再使用自訂的PythonOperator搭配boto3上傳到S3的兩步驟流程。

將資料轉移至Redshift

針對「自S3轉移資料至Redshift」這個動作來說,有幾個前置動作需要處理。以下是必要的幾個動作:

  1. 建立Redshift Serverless或是Redshift Cluster
  2. 建立對應的Schema以及Table
  3. Redshift的連線資訊,以及對應的使用者資訊寫入Airflow的連線資料以供後續使用。

在AWS所提供的Airflow工具中,S3ToRedshiftOperator可以協助我們使用COPY的方式將資料從S3搬運至Redshift;由於是COPY,所以必須先建立好對應的資料表才能正常運作。以下是一個範例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from airflow import DAG
from airflow.models import Variable
from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator

with DAG(...):
    # ...

    s3_to_redshift = S3ToRedshiftOperator(
        task_id='s3_to_redshift',
        redshift_conn_id="redshift_conn",
        aws_conn_id="aws_s3_conn",
        s3_bucket=s3_bucket_name,
        s3_key="<your file name>",
        schema="<your schema>",
        table="<your table>",
        method="<your method>",
    )

如果希望只取用部分的資料,可以使用jsonpaths.json檔案來宣告對應的資料位置,並作為COPY的選項提供給S3ToRedshiftOperator。假設我們索取的的資料結構如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
{
    "success": "...",
    "result": "...",
    "records": {
        "Station": [
            {
                "StationName": "<StationName>",
                "StationId": "<StationId>",
                "...": "..."
            },
            {
                "StationName": "<StationName>",
                "StationId": "<StationId>",
                "...": "..."
            }
        ]
    }
}

若我們希望只取出列表中第一個元素的StationName以及StationId並存放到Redshift的資料表中,我們可以建立以下的jsonpaths.json,並放置在S3的容器中:

1
2
3
4
5
6
{
  "jsonpaths": [
    "$['records']['Station'][0]['StationName']",
    "$['records']['Station'][0]['StationId']"
  ]
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from airflow import DAG
from airflow.models import Variable
from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator

with DAG(...):
    # ...

    s3_to_redshift = S3ToRedshiftOperator(
        task_id='s3_to_redshift',
        redshift_conn_id="redshift_conn",
        aws_conn_id="aws_s3_conn",
        s3_bucket=s3_bucket_name,
        s3_key="<your file name>",
        schema="<your schema>",
        table="<your table>",
        method="<your method>",
        copy_options=["JSON 's3://<path to your jsonpaths.json>'"],
    )

通常我們會把jsonpaths.json放在S3的容器中,但這不是唯一的做法。

AWS小結以及已知問題

透過這HttpToS3Operator以及HttpToS3Operator,我們完成透過HTTP取得資料,並依序存放至資料湖以及資料倉儲。但目前已知有一個問題:假設特定JSON欄位是一個列表的資料形式,以目前的jsonpaths.json,無法透過使用[*]的方法來配對所有的內部元素。此外,在AWS所提供的功能中,不能在搬運資料的同時建立對應的資料表,而必須先建立好對應的資料表。

GCP

在AWS方面,我們可以使用Google Cloud Storage作為資料湖,並使用BigQuery作為資料倉儲。

取得資料

在GCP所提供的Airflow工具中,並沒有類似AWS的HttpToS3Operator協助我們將資料的請求以及到資料湖之間的傳輸簡化為一個步驟。我們可以透過使用PythonOperator搭配HTTPOperator來達成資料的請求以及上傳的任務。以下是範例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import json
import boto3

from botocore.exceptions import ClientError

from airflow import DAG
from airflow.models import Variable
from airflow.providers.http.operators.http import HttpOperator
from airflow.decorators import task
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta

@task(task_id='upload_s3', trigger_rule=TriggerRule.ONE_SUCCESS, retries=3, retry_delay=timedelta(minutes=1))
def upload_s3(ti, **context):
    s3_id = Variable.get('s3-side-project-id')
    s3_key = Variable.get('s3-side-project-key')
    s3_bucket_name = Variable.get('s3-dev-bucket-name')
    s3_region_name = Variable.get('s3-default-region')
    
    data = ti.xcom_pull(task_ids='ping_cwa_api_task')
    data = json.loads(data)
    
    s3 = boto3.client(
        's3',
        region_name=s3_region_name,
        aws_access_key_id=s3_id,
        aws_secret_access_key=s3_key,
    )
    
    try:

        file_key = f'weather_report_10min-{execution_date}.json'
        s3.put_object(
            Body=json.dumps(data),
            Bucket=s3_bucket_name,
            Key=file_key
        )
        return file_key
        
    except ClientError as e:
        raise e

with DAG(...):
    token = Variable.get('cwa_auth_token')
    s3_bucket_name = Variable.get('s3-dev-bucket-name')

    get_data_task = HttpOperator(
        task_id='get_data_task',
        http_conn_id='cwa_real_time_api',
        endpoint="/api/v1/rest/datastore/O-A0003-001",
        method="GET",
        data={
            'Authorization': f'{token}',
            'format': "JSON",
        },
        headers={"Content-Type": "application/json"},
        log_response=True,
    )

    get_payload >> upload_s3()

將資料轉移至BigQuery

接下來,我們可以使用GCP所提供的GCSToBigQueryOperator來將位於GCS中的JSON檔案轉移至BigQuery中。在這邊,我們假設對應的BigQuery Dataset已經存在,但我們要寫入的表並未存在。以下是使用GCSToBigQueryOperator搬運資料的範例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# ... import dependencies

# ... upload task

with DAG(...):
    # ...

    PROJECT_ID = "<your gcp project id>"
    DATASET_NAME = "<your dataset name>"
    TABLE_NAME = "<your table name>"  

    gsc_to_bq_task = GCSToBigQueryOperator(
        task_id='gcs_to_bq_task',
        bucket=Variable.get("gcs-landing-bucket"),
        source_objects=["weather_report_10min-{{ execution_date }}.json"],
        destination_project_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}",
        source_format='NEWLINE_DELIMITED_JSON',
        write_disposition="WRITE_APPEND",
        ignore_unknown_values=True,
        autodetect=False,
    )

    get_payload >> upload_s3() >> gsc_to_bq_task

同樣的,若是我們希望對資料的部分內容做篩選,我們可以使用GCSToBigQueryOperator所提供的schema_fields參數,直接宣告希望篩選的結構,不需要額外撰寫並上傳jsonpaths.json檔案。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# ... import dependencies

# ... upload task

with DAG(...):
    # ...

    PROJECT_ID = "<your gcp project id>"
    DATASET_NAME = "<your dataset name>"
    TABLE_NAME = "<your table name>"  

    gsc_to_bq_task = GCSToBigQueryOperator(
        task_id='gcs_to_bq_task',
        bucket=Variable.get("gcs-landing-bucket"),
        source_objects=["weather_report_10min-{{ execution_date }}.json"],
        destination_project_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}",
        source_format='NEWLINE_DELIMITED_JSON',
        schema_fields=[
            {
                "name": "records", 
                "type": "RECORD", 
                "mode": "REPEATED",
                "fields": [
                    {
                        "name": "Station", 
                        "type": "RECORD", 
                        "mode": "REPEATED",
                        "fields": [
                            {"name": "StationName", "type": "STRING", "mode": "Required"},
                            {"name": "StationId", "type": "INTEGER", "mode": "Required"},
                        ],
                    },

                ],
            },
        ],
        write_disposition="WRITE_APPEND",
        ignore_unknown_values=True,
        autodetect=False,
    )

    get_payload >> upload_s3() >> gsc_to_bq_task

這點相較於AWS所提供的S3ToRedshiftOperator來說,方便許多,且提供了更彈性的設定方式,個人對此評價頗高。

如果有需要在DAG內加入建立BigQuery Dataset的任務,可以使用BigQueryCreateEmptyDatasetOperator來達成。

GCP小結

在GCP所提供的Airflow工具中,雖然沒有像AWS所提供的HttpToS3Operator可以協助我們將資料的取得以及上傳流程進行簡化,但提供了更便利且彈性的GCSToBigQueryOperator讓我們將資料篩選並轉移至資料倉儲之中。且GCP的GCSToBigQueryOperator可以在資料表不存在的狀況下直接建立資料表,相對於AWS的解決方案要來得更便利。

小結

在這篇文章中,我們簡單的介紹了在一個常見的場景下,可以如何透過Airflow搭配AWS以及GCP所提供的工具來建構資料管線,並且介紹了各自工具所提供的優點以及缺點。簡單總結一下雙方的好處:

  • AWS
    • 提供HttpToS3Operator,簡化資料取得以及上傳的流程
    • S3ToRedshiftOperator使用上較不彈性,略遜於GCSToBigQueryOperator
  • GCP
    • 並未提供類似HttpToS3Operator的便捷功能,需要自行撰寫PythonOperator搭配HttpOperator
    • GCSToBigQueryOperator提供了較為彈性且方便設定的功能,使用體驗較佳

有人或許會說,「你所提到的這些問題都可以藉由撰寫PythonOperator進行客製化來處理掉」。然而自己寫的PythonOperator需要額外的測試,而既有的工具是已經經過測試的,且能夠相容於對應的Airflow版本,在使用的相容性以及維護方面具有一定程度的優勢。

如果覺得我的文章對你有幫助,歡迎請我喝一杯咖啡~

comments powered by Disqus
使用 Hugo 建立
主題 StackJimmy 設計