Data Engineering/AirFlow

[Airflow] ch3. Airflow 스케줄링

쟈누이 2023. 11. 15. 00:22
반응형
from datetime import datetime
from pathlib import Path

import pandas as pd
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="02_daily_schedule",
    schedule_interval="@daily",
    start_date=datetime(2019, 1, 1),
    end_date=datetime(2019, 1, 5),
)

fetch_events = BashOperator(
    task_id="fetch_events",
    bash_command=(
        "mkdir -p /data/events && "
        "curl -o /data/events.json http://events_api:5000/events"
    ),
    dag=dag,
)

def _calculate_stats(input_path, output_path):
    """Calculates event statistics."""

    events = pd.read_json(input_path)
    stats = events.groupby(["date", "user"]).size().reset_index()

    Path(output_path).parent.mkdir(exist_ok=True)
    stats.to_csv(output_path, index=False)

calculate_stats = PythonOperator(
    task_id="calculate_stats",
    python_callable=_calculate_stats,
    op_kwargs={"input_path": "/data/events.json", "output_path": "/data/stats.csv"},
    dag=dag,
)

fetch_events >> calculate_stats

2. 정기적으로 실행하기

  • 스케줄 간격을 정의하여 정기적으로 실행할 수 있음
  • DAG 초기화 시, schedule_interval 인수를 설정하여 스케줄 간격을 정의할 수 있음.
  • 디폴트 값은 None 이며, DAG 가 예약 실행되지 않고, UI 또 API 를 통해 수동으로 트리거되면 실행

2.1 스케줄 간격 정의하기

dag = DAG(
    dag_id="02_daily_schedule",
    schedule_interval="@daily", <- 매일 자정에 실행되도록 DAG 스케줄
    start_date=datetime(2019, 1, 1), <- DAG 실행 스케줄을 시작할 날짜/시간
    end_date=datetime(2019, 1, 5),
)
  • DAG 의 시작 날짜를 지정하여 Airflow 가 DAG 를 언제부터 시작할지 설정 가능
  • Airflow 는 시작 날짜를 기준으로 첫 번째 DAG의 실행을 스케줄(시작날짜 + 간격)
  • 그 다음 실행은 첫 번째 스케줄된 간격 이후, 계속해서 해당 간격으로 실행
  • Airflow 는 종료일이 없으면 DAG 를 매일 스케줄된 대로 영원히(이론적으로) 실행
  • 기간이 정해진 프로젝트에서는 end_date 에 인수를 설정하여 특정 날짜 이후에 DAG 실행중지 설정 가능.

→ 완성된 스케줄

2.2 스케줄 간격 설정하기

  • Airflow 는 더 복잡한 스케줄 간격 설정을 지원하기 위해 Cron 을 사용해 스케줄 간격을 정의 가능
  • 시간 간격을 정할 때 매우 유용
  • Airflow 는 스케줄 간격을 의미하는 약어를 사용한 몇가지 매크로를 지원함

프리셋 이름 의미

@once 1회만 실행되도록 스케줄
@hourly 매시간 변경 시 1회 실행
@daily 매일 자정에 1회 실행
@weekly 매주 일요일 자정에 1회 실행
@monthly 매월 1일 자정에 1회 실행
@yearly 매년 1월 1일 자정에 1회 실행`

2.3 빈도 기반의 스케줄 간격 설정하기

  • cron 식의 제약은 특정 빈도(frequency) 마다 스케줄을 정의할 수 없다는 것
  • 만약 DAG 를 3일마다 실행하려면, Airflow 에서 지원하는 빈도 기반 스케줄러를 사용해야 되는데 timedelta (datatime 모듈에 포함된) 인스턴스를 사용해야 한다.
# 빈도 기반 스케줄 간격 정의하기
dag = DAG(
    dag_id="02_daily_schedule",
    schedule_interval=dt.timedelta(days=3), <- timedelta는 빈도 기반 스케줄을 사용할 수 있는 기능을 제공
    start_date=datetime(2019, 1, 1), <- DAG 실행 스케줄을 시작할 날짜/시간
    end_date=datetime(2019, 1, 5),
)

→ 위와 같이 설정하면 DAG 가 시작시간으로 부터 3일마다 실행된다.

→ DAG 를 2시간 마다 (dt.timedelta(hours=2) / 10분 마다(dt.timedelta(minutes=10)) 로도 설정 가능

3. 데이터 증분 처리하기

3.1 이벤트 데이터 증분 가져오기

  • 매일 사용자 이벤트 카탈로그 전체를 다운로드하고 작업은 효율적이지 못함.
  • 이를 해결하는 방법은 순차적으로 데이터를 가져올 수 있도록 DAG 를 변경하는 것이다,
  • 스케줄 간격에 해당하는 일자의 이벤트만 로드하고 새로운 이벤트만 통계를 계산

  • 이러한 증분방식은 스케줄된 하나의 작업에서 처리해야 할 데이터 양을 크게 줄일 수 있기 때문에, 전체 데이터 세트를 처리하는 것보다 훨씬 효율적이다.
  • 또한 날짜별로 분리된 단일 파일로 저장하고 있기 때문에 API가 제한하고 있는 30일 간의 내용을 하나로 저장하지 않고 시간이 지남에 따라 매일 순차적으로 파일을 저장 가능

3.2 실행날짜를 사용하여 동적 시간 참조하기

  • Airflow 는 태스크가 실행되는 특정 간격을 정의할 수 있는 추가 매개 변수를 제공한다.
  • 중요한 매개변수는 DAG 가 실행되는 날짜와 시간을 나타내는 execution_date
  • execution_date 는 특정 날짜가 아니라 스케줄 간격으로 실행되는 시작 시간을 나타내는 타임스탬프
  • 스케줄 간격의 종료 시간은 next_execution_date 라는 매개변수를 사용
  • Airflow 는 과거의 스케줄 간격의 시작을 정의하는 previous_execution_date 매개 변수 제공 - 현재 시간 간격의 데이터와 이전 간격의 데이터를 대조하여 분석을 수행할 때 유

  • Airflow 에서는 실행 날짜를 오퍼레이터에서 참조하여 사용 가능
  • 예를 들어 BashOperator 에서 Airflow 의 탬플릿 기능을 사용하여 배시 명령이 실행될 날짜를 동적으로 포함 가능
# 특정 날짜 지정을 위해 탬플릿 사용하기
fetch_events = BashOperator(
    task_id="fetch_events",
    bash_command=(
        "mkdir -p /data/events && "
        "curl -o /data/events.json "
        "http://events_api:5000/events?"
        "start_date={{execution_date.strftime('%Y-%m-%d')}}&"
				-> Jinja 탬플릿으로 형식화된 execution_date 삽입
        "end_date={{next_execution_date.strftime('%Y-%m-%d')}}"
				-> next_execution_date 로 다음 실행 간격의 날짜를 정의
    ),
    dag=dag,
)

→ Jinja 탬플릿 구문을 사용하여 실행 날짜를 참조하고 datetime 의 stftime 매서드를 사용해 문자열 형식으로 반환 형식을 지정

**** jinja 는 무엇인가? (아래 링크 참고)**

01.Flask 기초 - Jinja template

 

01.Flask 기초 - Jinja template

🌈 Jinja template > ### 🔥 Jinja 템플릿이란? > ### 🔥 변수 사용하기 : {{변수명}} > ### 🔥 반복문 사용하기 : {% for %} ~ {% endfor %} > ### 🔥 조건문 사용하기 : {% if %} {% elif %

velog.io

  • Airflow 는 일반적인 날짜 형식에 대한 여러 유형의 축약 매개변수를 제공한다.
  • 예를 들어, ds 및 ds_nodash 매개변수는 각각 YYYY-MM-DD 및 YYYYMMDD 형으로된 execution_date 의 다른 표현
  • 마찬가지로, next_ds, next_ds_nodash, prev_ds 및 prev_ds_nodash 는 각각 다음 및 이전 실행 날짜에 대한 축약 표기법을 제공
  • 축약 표기법을 사용하여 다음과 같이 증분 데이터를 가져오는 명령 구문 작성 가능
# 템플릿에서 축약어 사용하기
fetch_events = BashOperator(
    task_id="fetch_events",
    bash_command=(
        "mkdir -p /data/events && "
        "curl -o /data/events.json "
        "http://events_api:5000/events?"
        "start_date={{ds}}&" <- ds 는 YYYYMM-DD 형식의 execution_date 를 제공
        "end_date={{next_ds}}" <- next_ds 는 next_execution_date 에 대해 동일하게 제공
    ),
    dag=dag,
)

→ 축약된 형식은 더 읽기 쉽지만, 좀 더 복잡한 날짜 형식의 경우에는 유연하게 표현할 수 있는

strftime 매서드 사용 가능

3.3 데이터 파티셔닝

  • 각각의 새로운 태스크가 전일의 데이터를 덮어쓰는 현상이 발생할 수 있음
  • 이 문제는 태스크의 출력을 해당 실행 날짜의 이름이 적힌 파일에 기록함으로써 데이터 세트를 일일 배치로 나누는 것임
import datetime as dt
from pathlib import Path

import pandas as pd

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="08_templated_path",
    schedule_interval="@daily",
    start_date=dt.datetime(year=2019, month=1, day=1),
    end_date=dt.datetime(year=2019, month=1, day=5),
)

fetch_events = BashOperator(
    task_id="fetch_events",
    bash_command=(
        "mkdir -p /data/events && "
        "curl -o /data/events/{{ds}}.json "
        "http://events_api:5000/events?"
        "start_date={{ds}}&"
        "end_date={{next_ds}}"
    ),
    dag=dag,
)

def _calculate_stats(**context):
    """Calculates event statistics."""
    input_path = context["templates_dict"]["input_path"]
    output_path = context["templates_dict"]["output_path"]

    events = pd.read_json(input_path)
    stats = events.groupby(["date", "user"]).size().reset_index()

    Path(output_path).parent.mkdir(exist_ok=True)
    stats.to_csv(output_path, index=False)

calculate_stats = PythonOperator(
    task_id="calculate_stats",
    python_callable=_calculate_stats,
    templates_dict={
        "input_path": "/data/events/{{ds}}.json", <- 반횐된 값이 템플릿 파일 이름에 기록
        "output_path": "/data/stats/{{ds}}.csv",
    },
    # Required in Airflow 1.10 to access templates_dict, deprecated in Airflow 2+.
    # provide_context=True,
    dag=dag,
)

fetch_events >> calculate_stats

→ 이를 통해 2022-01-01 실행날짜에 다운로드되는 모든 데이터가 data/events/2022-01-01.json 파일에 기록됨

  • 데이터를 더 작고 관리하기 쉬운 조각으로 나누는 작업은 데이터 저장 및 처리 시스템에서 일반적인 전략
  • 이러한 방법을 일반적으로 파티셔닝 partitioning 이라고 하며, 데이터 세트의 작은 부분을 파티션 이라고 한다
  • 실행 날짜별로 데이터 세트를 파티션하는 이점은 DAG 에서 아래 두번째 태스크(compute_stats) 를 고려할 때 분명해진다.
calculate_stats = PythonOperator(
    task_id="calculate_stats",
    python_callable=_calculate_stats,
    templates_dict={
        "input_path": "/data/events/events.json",
        "output_path": "/data/stats/stats.csv",
    },
  • 파티션된 데이터 세트를 사용하면 파티션된 이벤트 데이터와 출력 파일을 확인하도록 태스크의 입출력에 대한 경로를 변경하여 각 파티션에 대한 통계보다 효율적으로 계산 가능
calculate_stats = PythonOperator(
    task_id="calculate_stats",
    python_callable=_calculate_stats,
    templates_dict={
        "input_path": "/data/events/{{ds}}.json", <- 반횐된 값이 템플릿 파일 이름에 기록
        "output_path": "/data/stats/{{ds}}.csv",
    },
    # Required in Airflow 1.10 to access templates_dict, deprecated in Airflow 2+.
    # provide_context=True,
    dag=dag,
)

4. Airflow 의 실행날짜 이해

  • 이 부분은 매우 중요한 부분이므로 날짜가 어떻게 정의되었는지 확실히 이해해야 한다

4.1 고정된 스케줄 간격으로 태스크 실행

  • DAG 예약 위해 Airflow 는 세가지 매개변수(시작 날짜 / 스케줄 간격 / 종료 날짜) 를 사용해 시간을 스케줄 간격으로 나눔
  • 지정된 시작 날짜부터 시작하여 종료 날짜(선택 사항)에 종료

  • 간격 기반(interval-based) 의 시간 표현에서는 해당 간격의 시간 슬롯이 경과되자 마자 해당 간격 동안 DAG 가 실행
  • 간격 기반 접근방식을 사용하는 이점은 작업이 실행되는 시간 간격(시작 및 끝)을 정확하게 알고 있으므로 이전 장에서 본 증분 데이터 처리 유형을 수행하는데 적합
  • Airflow 의 시간 처리가 스케줄 간격에 따라 실행된다는 사실을 이해하면, Airflow 내에서 실행 날짜가 어떻게 정의되는지 이해하는데 도움
  • 대부분은 DAG 의 실행날짜가 2019-01-04 가 되어야 한다고 생각하지만 태스크가 실행될 때 execution_date 변수의 값을 보면 실제로 실행 날짜가 2019-01-03 인 것이 확인된다.
  • 이유는, Airflow 가 DAG 의 실행 날짜를 해당 간격의 시작 날짜로 정의하기 때문
  • 개념적으로 DAG 가 실제 실행되는 순간이 아니라 예약 간격을 표시한다고 생각하면 된다.
  • 실행날짜(execution_date) 라는 이름이 이에 대해 이해하는 것을 어렵게 만듬..

  • Airflow 실행날짜를 해당 스케줄 간격의 시작으로 정의하면 특정 간격의 시작과 끝을 도출하는데 사용할 수 있음
  • 예를들어, 작업을 실행할 때 해당 간격의 시작과 끝은 execution_date 및 next_execution 날짜 매개변수로 정의, 이전 스케줄 간격은 previous_execution_date 및 execution_date 매개변수를 사용해 확인 가능

  • 태스크에서 previous_execution_date 및 next_execution_date 매개변수를 사용할 때 주의해야할 사항은, 이러한 매개변수가 스케줄 간격 이후의 DAG 실행을 통해서만 정의된다는 것.

5. 과거의 데이터 간격을 메꾸기 위해 백필 사용하기

  • Airflow 를 사용하면 임의의 시작 날짜로부터 스케줄 간격을 정의할 수 있어 과가 간격을 정의 가능
  • 과거 데이터 세트를 로드하거나 분석하기 위해 DAG 의 과거 기록을 실행 할 수 있음 이를 백필 이라고 함

5.1 과거 시점의 작업 실행하기

  • Airflow 는 실행되지 않는 과거 스케줄 간격을 예약하고 실행한다.
  • 따라서, 과거 시작 날짜를 지정하고 해당 DAG 를 활성화하면 현재 시간이 실행되기 전에 경과된 모든 간격이 생성됨
  • 이 동작은 DAG 의 catchup 매개 변수에 의해 제어되며 False 로 설정하여 비활성화 가능
# 과거 시점의 태스크 실행을 피하기 위한 catchup 비활성화
dag = DAG(
    dag_id="09_no_catchup",
    schedule_interval="@daily",
    start_date=dt.datetime(year=2019, month=1, day=1),
    end_date=dt.datetime(year=2019, month=1, day=5),
    catchup=False,
)

→ 이 설정을 통해 가장 최근 스케줄 간격에 대해서만 실행 가능

  • 백필(backfill) 은 훌륭한 개념이지만 원천 시스템의 데이터 가용성에 따라 제한
  • 백필은 코드를 변경한 후 데이터를 다시 처리하는 데 사용할 수 있음
from datetime import datetime
from pathlib import Path

import pandas as pd
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="02_daily_schedule",
    schedule_interval="@daily",
    start_date=datetime(2019, 1, 1),
    end_date=datetime(2019, 1, 5),
)

fetch_events = BashOperator(
    task_id="fetch_events",
    bash_command=(
        "mkdir -p /data/events && "
        "curl -o /data/events.json http://events_api:5000/events"
    ),
    dag=dag,
)

def _calculate_stats(input_path, output_path):
    """Calculates event statistics."""

    events = pd.read_json(input_path)
    stats = events.groupby(["date", "user"]).size().reset_index()

    Path(output_path).parent.mkdir(exist_ok=True)
    stats.to_csv(output_path, index=False)

calculate_stats = PythonOperator(
    task_id="calculate_stats",
    python_callable=_calculate_stats,
    op_kwargs={"input_path": "/data/events.json", "output_path": "/data/stats.csv"},
    dag=dag,
)

fetch_events >> calculate_stats
반응형