카테고리 없음

[Airflow] 2. Airflow DAG 의 구조

쟈누이 2023. 11. 15. 00:00
반응형

2. 첫번째 Airflow DAG 작성

  • Airflow 는 하나 이상의 단계로 구성된 대규모 작업을 개별 태스크로 분할하고 DAG ( Directed Acyclic Graph) 로 형성 가능
  • 다중 태스크를 병렬로 실행할 수 있고 서로 다른 기술을 사용할 수 있음
import json
import pathlib

import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="listing_2_02",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval=None,
)

download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L '<https://ll.thespacedevs.com/2.0.0/launch/upcoming>'",  # noqa: E501
    dag=dag,
)

def _get_pictures():
    # Ensure directory exists
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)

    # Download all pictures in launches.json
    with open("/tmp/launches.json") as f:
        launches = json.load(f)
        image_urls = [launch["image"] for launch in launches["results"]]
        for image_url in image_urls:
            try:
                response = requests.get(image_url)
                image_filename = image_url.split("/")[-1]
                target_file = f"/tmp/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file}")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")

get_pictures = PythonOperator(
    task_id="get_pictures", python_callable=_get_pictures, dag=dag
)

notify = BashOperator(
    task_id="notify",
    bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
    dag=dag,
)

download_launches >> get_pictures >> notify
  • DAG 는 모든 워크플로의 시작
  • → 하나의 DAG 는 하나의 파이프라인
# DAG 객체 인스턴스 생성
dag = DAG(
    dag_id="listing_2_02",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval=None, # DAG 가 자동으로 실행되지 않음
)
  • dag 를 만들기에 앞서 가장 먼저 선언 하는 것
  • 특정 Task를 정의하고 실행해야 하는 Task 순서와 종류 그리고 진행 순서와 실행 빈도를 지정
# bash 커맨드 실행위해 BashOperator 객체 인스턴스 생성
download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L '<https://ll.thespacedevs.com/2.0.0/launch/upcoming>'",  # noqa: E501
    dag=dag,
)
  • 각가의 오퍼레이터는 하나의 태스크를 수행
  • 여러개의 오퍼레이터가 모여 하나의 워크플로(DAG) 를 구성
# 태스크 실행 순서 정의
download_launches >> get_pictures >> notify
  • 오퍼레이터는 서로 독립적으로 실행이 가능하지만, 순서를 정의해 실행 가능
  • → 이를 Airflow 에서는 의존성(dependency) 라고 한다.
  • 오른쪽 시프트 연산자 즉, “rshift”( >>) 를 사용하여 태스크 간의 의존성을 정의
  • 이를 통해 download_lanunches 이 성공적으로 완료된 후에만 get_pictures 태스크가 실행되고 get_pictures 가 성공적으로 완료된 후 notify 가 실행된다.
  • DAG 에 대한 사용법 및 내용은 아래 링크 참고( Turorial )

DAGs - Airflow Documentation

 

https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html

 

airflow.apache.org

2.1 DAG 와 오퍼레이터 차이점

  • Operator
    • 단일 작업 수행 역할(모두)
    • 몇몇 오퍼레이터는 BashOperator / PythonOperator / EmailOperator 또는 SimpleHTTPOperator 같이 좀 더 특수한 목적을 위해 사용
    • 사용자 관점에서 오퍼레이터와 태스크는 같은 의미이며 두 용어를 혼용해 사용
  • DAG
    • 오퍼레이터 집합에 대한 실행을 오케스트레이션(조정, 조율) 함
    • 오퍼레이터의 시작과 정지, 연속된 다음 태스크의 시작, 그리고 오퍼레이터 간의 의존성 보장이 포함

3. Airflow 에서 DAG 실행하기

  • Airflow 는 1. 스케줄러, 2. 웹 서버 및 3. 데이터베이스의 세 가지 핵심 컴포넌트로 구성

3.3 Airflow UI 둘러보기

→ Airflow 로그인 화면

→ Airflow 홈 화면

  • DAG 를 실행하려면 On 상태여야 한다.

→ Airflow 의 그래프 뷰 화면

  • Airflow 는 스크립트를 task 진행 순서 및 DAG 및 task 정보를 파악하여 UI 에 시각화

4. 스케쥴 간격으로 실행하기

  • Airflow 에서는 DAG 를 일정 시간 간격으로 실행할 수 있도록 스케쥴 설정이 가능
  • DAG 에서 schedule_interval 파라미터 설정
# 하루에 한 번 DAG 실행하기
dag = DAG(
    dag_id="listing_2_02",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval="@daily", 
)
  • 아래의 tree view는 graph view와 유사하지만 시간 경과에 따라 실행되는 그래프 구조를 표시
  • 단일 워크플로의 모든 실행 상태에 대한 개요를 볼 수 있음

5. 실패한 태스크에 대한 처리

  • 실패한 특정 태스크는 그래프 뷰와 트리뷰에 모두 빨간색으로 표시
  • 이전 태스크가 모두 성공해야 하며 실패한 태스크 이후 연속된 태스크는 실행되지 않는다.
  • 실패한 작업이 초기화된 후 Airflow 는 자동으로 태스크를 재시작한다.

2. 첫번째 Airflow DAG 작성

  • Airflow 는 하나 이상의 단계로 구성된 대규모 작업을 개별 태스크로 분할하고 DAG ( Directed Acyclic Graph) 로 형성 가능
  • 다중 태스크를 병렬로 실행할 수 있고 서로 다른 기술을 사용할 수 있음
import json
import pathlib

import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="listing_2_02",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval=None,
)

download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L '<https://ll.thespacedevs.com/2.0.0/launch/upcoming>'",  # noqa: E501
    dag=dag,
)

def _get_pictures():
    # Ensure directory exists
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)

    # Download all pictures in launches.json
    with open("/tmp/launches.json") as f:
        launches = json.load(f)
        image_urls = [launch["image"] for launch in launches["results"]]
        for image_url in image_urls:
            try:
                response = requests.get(image_url)
                image_filename = image_url.split("/")[-1]
                target_file = f"/tmp/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file}")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")

get_pictures = PythonOperator(
    task_id="get_pictures", python_callable=_get_pictures, dag=dag
)

notify = BashOperator(
    task_id="notify",
    bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
    dag=dag,
)

download_launches >> get_pictures >> notify
  • DAG 는 모든 워크플로의 시작
  • → 하나의 DAG 는 하나의 파이프라인
# DAG 객체 인스턴스 생성
dag = DAG(
    dag_id="listing_2_02",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval=None, # DAG 가 자동으로 실행되지 않음
)
  • dag 를 만들기에 앞서 가장 먼저 선언 하는 것
  • 특정 Task를 정의하고 실행해야 하는 Task 순서와 종류 그리고 진행 순서와 실행 빈도를 지정
# bash 커맨드 실행위해 BashOperator 객체 인스턴스 생성
download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L '<https://ll.thespacedevs.com/2.0.0/launch/upcoming>'",  # noqa: E501
    dag=dag,
)
  • 각가의 오퍼레이터는 하나의 태스크를 수행
  • 여러개의 오퍼레이터가 모여 하나의 워크플로(DAG) 를 구성
# 태스크 실행 순서 정의
download_launches >> get_pictures >> notify
  • 오퍼레이터는 서로 독립적으로 실행이 가능하지만, 순서를 정의해 실행 가능
  • → 이를 Airflow 에서는 의존성(dependency) 라고 한다.
  • 오른쪽 시프트 연산자 즉, “rshift”( >>) 를 사용하여 태스크 간의 의존성을 정의
  • 이를 통해 download_lanunches 이 성공적으로 완료된 후에만 get_pictures 태스크가 실행되고 get_pictures 가 성공적으로 완료된 후 notify 가 실행된다.
  • DAG 에 대한 사용법 및 내용은 아래 링크 참고( Turorial )

DAGs - Airflow Documentation

2.1 DAG 와 오퍼레이터 차이점

  • Operator
    • 단일 작업 수행 역할(모두)
    • 몇몇 오퍼레이터는 BashOperator / PythonOperator / EmailOperator 또는 SimpleHTTPOperator 같이 좀 더 특수한 목적을 위해 사용
    • 사용자 관점에서 오퍼레이터와 태스크는 같은 의미이며 두 용어를 혼용해 사용
  • DAG
    • 오퍼레이터 집합에 대한 실행을 오케스트레이션(조정, 조율) 함
    • 오퍼레이터의 시작과 정지, 연속된 다음 태스크의 시작, 그리고 오퍼레이터 간의 의존성 보장이 포함

3. Airflow 에서 DAG 실행하기

  • Airflow 는 1. 스케줄러, 2. 웹 서버 및 3. 데이터베이스의 세 가지 핵심 컴포넌트로 구성

3.3 Airflow UI 둘러보기

→ Airflow 로그인 화면

→ Airflow 홈 화면

  • DAG 를 실행하려면 On 상태여야 한다.

→ Airflow 의 그래프 뷰 화면

  • Airflow 는 스크립트를 task 진행 순서 및 DAG 및 task 정보를 파악하여 UI 에 시각화

4. 스케쥴 간격으로 실행하기

  • Airflow 에서는 DAG 를 일정 시간 간격으로 실행할 수 있도록 스케쥴 설정이 가능
  • DAG 에서 schedule_interval 파라미터 설정
# 하루에 한 번 DAG 실행하기
dag = DAG(
    dag_id="listing_2_02",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval="@daily", 
)
  • 아래의 tree view는 graph view와 유사하지만 시간 경과에 따라 실행되는 그래프 구조를 표시
  • 단일 워크플로의 모든 실행 상태에 대한 개요를 볼 수 있음

5. 실패한 태스크에 대한 처리

  • 실패한 특정 태스크는 그래프 뷰와 트리뷰에 모두 빨간색으로 표시
  • 이전 태스크가 모두 성공해야 하며 실패한 태스크 이후 연속된 태스크는 실행되지 않는다.
  • 실패한 작업이 초기화된 후 Airflow 는 자동으로 태스크를 재시작한다.

2. 첫번째 Airflow DAG 작성

  • Airflow 는 하나 이상의 단계로 구성된 대규모 작업을 개별 태스크로 분할하고 DAG ( Directed Acyclic Graph) 로 형성 가능
  • 다중 태스크를 병렬로 실행할 수 있고 서로 다른 기술을 사용할 수 있음
import json
import pathlib

import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="listing_2_02",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval=None,
)

download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L '<https://ll.thespacedevs.com/2.0.0/launch/upcoming>'",  # noqa: E501
    dag=dag,
)

def _get_pictures():
    # Ensure directory exists
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)

    # Download all pictures in launches.json
    with open("/tmp/launches.json") as f:
        launches = json.load(f)
        image_urls = [launch["image"] for launch in launches["results"]]
        for image_url in image_urls:
            try:
                response = requests.get(image_url)
                image_filename = image_url.split("/")[-1]
                target_file = f"/tmp/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file}")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")

get_pictures = PythonOperator(
    task_id="get_pictures", python_callable=_get_pictures, dag=dag
)

notify = BashOperator(
    task_id="notify",
    bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
    dag=dag,
)

download_launches >> get_pictures >> notify
  • DAG 는 모든 워크플로의 시작
  • → 하나의 DAG 는 하나의 파이프라인
# DAG 객체 인스턴스 생성
dag = DAG(
    dag_id="listing_2_02",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval=None, # DAG 가 자동으로 실행되지 않음
)
  • dag 를 만들기에 앞서 가장 먼저 선언 하는 것
  • 특정 Task를 정의하고 실행해야 하는 Task 순서와 종류 그리고 진행 순서와 실행 빈도를 지정
# bash 커맨드 실행위해 BashOperator 객체 인스턴스 생성
download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L '<https://ll.thespacedevs.com/2.0.0/launch/upcoming>'",  # noqa: E501
    dag=dag,
)
  • 각가의 오퍼레이터는 하나의 태스크를 수행
  • 여러개의 오퍼레이터가 모여 하나의 워크플로(DAG) 를 구성
# 태스크 실행 순서 정의
download_launches >> get_pictures >> notify
  • 오퍼레이터는 서로 독립적으로 실행이 가능하지만, 순서를 정의해 실행 가능
  • → 이를 Airflow 에서는 의존성(dependency) 라고 한다.
  • 오른쪽 시프트 연산자 즉, “rshift”( >>) 를 사용하여 태스크 간의 의존성을 정의
  • 이를 통해 download_lanunches 이 성공적으로 완료된 후에만 get_pictures 태스크가 실행되고 get_pictures 가 성공적으로 완료된 후 notify 가 실행된다.
  • DAG 에 대한 사용법 및 내용은 아래 링크 참고( Turorial )

DAGs - Airflow Documentation

 

https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html

 

airflow.apache.org

2.1 DAG 와 오퍼레이터 차이점

  • Operator
    • 단일 작업 수행 역할(모두)
    • 몇몇 오퍼레이터는 BashOperator / PythonOperator / EmailOperator 또는 SimpleHTTPOperator 같이 좀 더 특수한 목적을 위해 사용
    • 사용자 관점에서 오퍼레이터와 태스크는 같은 의미이며 두 용어를 혼용해 사용
  • DAG
    • 오퍼레이터 집합에 대한 실행을 오케스트레이션(조정, 조율) 함
    • 오퍼레이터의 시작과 정지, 연속된 다음 태스크의 시작, 그리고 오퍼레이터 간의 의존성 보장이 포함

3. Airflow 에서 DAG 실행하기

  • Airflow 는 1. 스케줄러, 2. 웹 서버 및 3. 데이터베이스의 세 가지 핵심 컴포넌트로 구성

3.3 Airflow UI 둘러보기

→ Airflow 로그인 화면

→ Airflow 홈 화면

  • DAG 를 실행하려면 On 상태여야 한다.

→ Airflow 의 그래프 뷰 화면

  • Airflow 는 스크립트를 task 진행 순서 및 DAG 및 task 정보를 파악하여 UI 에 시각화

4. 스케쥴 간격으로 실행하기

  • Airflow 에서는 DAG 를 일정 시간 간격으로 실행할 수 있도록 스케쥴 설정이 가능
  • DAG 에서 schedule_interval 파라미터 설정
# 하루에 한 번 DAG 실행하기
dag = DAG(
    dag_id="listing_2_02",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval="@daily", 
)
  • 아래의 tree view는 graph view와 유사하지만 시간 경과에 따라 실행되는 그래프 구조를 표시
  • 단일 워크플로의 모든 실행 상태에 대한 개요를 볼 수 있음

5. 실패한 태스크에 대한 처리

  • 실패한 특정 태스크는 그래프 뷰와 트리뷰에 모두 빨간색으로 표시
  • 이전 태스크가 모두 성공해야 하며 실패한 태스크 이후 연속된 태스크는 실행되지 않는다.
  • 실패한 작업이 초기화된 후 Airflow 는 자동으로 태스크를 재시작한다.

2. 첫번째 Airflow DAG 작성

  • Airflow 는 하나 이상의 단계로 구성된 대규모 작업을 개별 태스크로 분할하고 DAG ( Directed Acyclic Graph) 로 형성 가능
  • 다중 태스크를 병렬로 실행할 수 있고 서로 다른 기술을 사용할 수 있음
import json
import pathlib

import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="listing_2_02",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval=None,
)

download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L '<https://ll.thespacedevs.com/2.0.0/launch/upcoming>'",  # noqa: E501
    dag=dag,
)

def _get_pictures():
    # Ensure directory exists
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)

    # Download all pictures in launches.json
    with open("/tmp/launches.json") as f:
        launches = json.load(f)
        image_urls = [launch["image"] for launch in launches["results"]]
        for image_url in image_urls:
            try:
                response = requests.get(image_url)
                image_filename = image_url.split("/")[-1]
                target_file = f"/tmp/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file}")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")

get_pictures = PythonOperator(
    task_id="get_pictures", python_callable=_get_pictures, dag=dag
)

notify = BashOperator(
    task_id="notify",
    bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
    dag=dag,
)

download_launches >> get_pictures >> notify
  • DAG 는 모든 워크플로의 시작
  • → 하나의 DAG 는 하나의 파이프라인
# DAG 객체 인스턴스 생성
dag = DAG(
    dag_id="listing_2_02",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval=None, # DAG 가 자동으로 실행되지 않음
)
  • dag 를 만들기에 앞서 가장 먼저 선언 하는 것
  • 특정 Task를 정의하고 실행해야 하는 Task 순서와 종류 그리고 진행 순서와 실행 빈도를 지정
# bash 커맨드 실행위해 BashOperator 객체 인스턴스 생성
download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L '<https://ll.thespacedevs.com/2.0.0/launch/upcoming>'",  # noqa: E501
    dag=dag,
)
  • 각가의 오퍼레이터는 하나의 태스크를 수행
  • 여러개의 오퍼레이터가 모여 하나의 워크플로(DAG) 를 구성
# 태스크 실행 순서 정의
download_launches >> get_pictures >> notify
  • 오퍼레이터는 서로 독립적으로 실행이 가능하지만, 순서를 정의해 실행 가능
  • → 이를 Airflow 에서는 의존성(dependency) 라고 한다.
  • 오른쪽 시프트 연산자 즉, “rshift”( >>) 를 사용하여 태스크 간의 의존성을 정의
  • 이를 통해 download_lanunches 이 성공적으로 완료된 후에만 get_pictures 태스크가 실행되고 get_pictures 가 성공적으로 완료된 후 notify 가 실행된다.
  • DAG 에 대한 사용법 및 내용은 아래 링크 참고( Turorial )

DAGs - Airflow Documentation

2.1 DAG 와 오퍼레이터 차이점

  • Operator
    • 단일 작업 수행 역할(모두)
    • 몇몇 오퍼레이터는 BashOperator / PythonOperator / EmailOperator 또는 SimpleHTTPOperator 같이 좀 더 특수한 목적을 위해 사용
    • 사용자 관점에서 오퍼레이터와 태스크는 같은 의미이며 두 용어를 혼용해 사용
  • DAG
    • 오퍼레이터 집합에 대한 실행을 오케스트레이션(조정, 조율) 함
    • 오퍼레이터의 시작과 정지, 연속된 다음 태스크의 시작, 그리고 오퍼레이터 간의 의존성 보장이 포함

3. Airflow 에서 DAG 실행하기

  • Airflow 는 1. 스케줄러, 2. 웹 서버 및 3. 데이터베이스의 세 가지 핵심 컴포넌트로 구성

3.3 Airflow UI 둘러보기

→ Airflow 로그인 화면

→ Airflow 홈 화면

  • DAG 를 실행하려면 On 상태여야 한다.

→ Airflow 의 그래프 뷰 화면

  • Airflow 는 스크립트를 task 진행 순서 및 DAG 및 task 정보를 파악하여 UI 에 시각화

4. 스케쥴 간격으로 실행하기

  • Airflow 에서는 DAG 를 일정 시간 간격으로 실행할 수 있도록 스케쥴 설정이 가능
  • DAG 에서 schedule_interval 파라미터 설정
# 하루에 한 번 DAG 실행하기
dag = DAG(
    dag_id="listing_2_02",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval="@daily", 
)
  • 아래의 tree view는 graph view와 유사하지만 시간 경과에 따라 실행되는 그래프 구조를 표시
  • 단일 워크플로의 모든 실행 상태에 대한 개요를 볼 수 있음

5. 실패한 태스크에 대한 처리

  • 실패한 특정 태스크는 그래프 뷰와 트리뷰에 모두 빨간색으로 표시
  • 이전 태스크가 모두 성공해야 하며 실패한 태스크 이후 연속된 태스크는 실행되지 않는다.
  • 실패한 작업이 초기화된 후 Airflow 는 자동으로 태스크를 재시작한다.

2. 첫번째 Airflow DAG 작성

  • Airflow 는 하나 이상의 단계로 구성된 대규모 작업을 개별 태스크로 분할하고 DAG ( Directed Acyclic Graph) 로 형성 가능
  • 다중 태스크를 병렬로 실행할 수 있고 서로 다른 기술을 사용할 수 있음
import json
import pathlib

import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="listing_2_02",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval=None,
)

download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L '<https://ll.thespacedevs.com/2.0.0/launch/upcoming>'",  # noqa: E501
    dag=dag,
)

def _get_pictures():
    # Ensure directory exists
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)

    # Download all pictures in launches.json
    with open("/tmp/launches.json") as f:
        launches = json.load(f)
        image_urls = [launch["image"] for launch in launches["results"]]
        for image_url in image_urls:
            try:
                response = requests.get(image_url)
                image_filename = image_url.split("/")[-1]
                target_file = f"/tmp/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file}")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")

get_pictures = PythonOperator(
    task_id="get_pictures", python_callable=_get_pictures, dag=dag
)

notify = BashOperator(
    task_id="notify",
    bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
    dag=dag,
)

download_launches >> get_pictures >> notify
  • DAG 는 모든 워크플로의 시작
  • → 하나의 DAG 는 하나의 파이프라인
# DAG 객체 인스턴스 생성
dag = DAG(
    dag_id="listing_2_02",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval=None, # DAG 가 자동으로 실행되지 않음
)
  • dag 를 만들기에 앞서 가장 먼저 선언 하는 것
  • 특정 Task를 정의하고 실행해야 하는 Task 순서와 종류 그리고 진행 순서와 실행 빈도를 지정
# bash 커맨드 실행위해 BashOperator 객체 인스턴스 생성
download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L '<https://ll.thespacedevs.com/2.0.0/launch/upcoming>'",  # noqa: E501
    dag=dag,
)
  • 각가의 오퍼레이터는 하나의 태스크를 수행
  • 여러개의 오퍼레이터가 모여 하나의 워크플로(DAG) 를 구성
# 태스크 실행 순서 정의
download_launches >> get_pictures >> notify
  • 오퍼레이터는 서로 독립적으로 실행이 가능하지만, 순서를 정의해 실행 가능
  • → 이를 Airflow 에서는 의존성(dependency) 라고 한다.
  • 오른쪽 시프트 연산자 즉, “rshift”( >>) 를 사용하여 태스크 간의 의존성을 정의
  • 이를 통해 download_lanunches 이 성공적으로 완료된 후에만 get_pictures 태스크가 실행되고 get_pictures 가 성공적으로 완료된 후 notify 가 실행된다.
  • DAG 에 대한 사용법 및 내용은 아래 링크 참고( Turorial )

DAGs - Airflow Documentation

2.1 DAG 와 오퍼레이터 차이점

  • Operator
    • 단일 작업 수행 역할(모두)
    • 몇몇 오퍼레이터는 BashOperator / PythonOperator / EmailOperator 또는 SimpleHTTPOperator 같이 좀 더 특수한 목적을 위해 사용
    • 사용자 관점에서 오퍼레이터와 태스크는 같은 의미이며 두 용어를 혼용해 사용
  • DAG
    • 오퍼레이터 집합에 대한 실행을 오케스트레이션(조정, 조율) 함
    • 오퍼레이터의 시작과 정지, 연속된 다음 태스크의 시작, 그리고 오퍼레이터 간의 의존성 보장이 포함

3. Airflow 에서 DAG 실행하기

  • Airflow 는 1. 스케줄러, 2. 웹 서버 및 3. 데이터베이스의 세 가지 핵심 컴포넌트로 구성

3.3 Airflow UI 둘러보기

→ Airflow 로그인 화면

→ Airflow 홈 화면

  • DAG 를 실행하려면 On 상태여야 한다.

→ Airflow 의 그래프 뷰 화면

  • Airflow 는 스크립트를 task 진행 순서 및 DAG 및 task 정보를 파악하여 UI 에 시각화

4. 스케쥴 간격으로 실행하기

  • Airflow 에서는 DAG 를 일정 시간 간격으로 실행할 수 있도록 스케쥴 설정이 가능
  • DAG 에서 schedule_interval 파라미터 설정
# 하루에 한 번 DAG 실행하기
dag = DAG(
    dag_id="listing_2_02",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval="@daily", 
)
  • 아래의 tree view는 graph view와 유사하지만 시간 경과에 따라 실행되는 그래프 구조를 표시
  • 단일 워크플로의 모든 실행 상태에 대한 개요를 볼 수 있음

5. 실패한 태스크에 대한 처리

  • 실패한 특정 태스크는 그래프 뷰와 트리뷰에 모두 빨간색으로 표시
  • 이전 태스크가 모두 성공해야 하며 실패한 태스크 이후 연속된 태스크는 실행되지 않는다.
  • 실패한 작업이 초기화된 후 Airflow 는 자동으로 태스크를 재시작한다.
반응형