Apache Airflow 2.10.0 (2024-08-15)
Apache Airflow는 2.0 이후로 많은 핵심적인 업데이트를 거쳤습니다.
2.0에서는 DAG 동시성 향상과 REST API의 공식 지원이 도입되었고, Scheduler의 성능이 크게 개선되었습니다.
2.2에서는 Task Group의 기능이 확장되었고, Smart Sensors가 추가되어 센서 작업의 효율이 높아졌습니다.
2.4부터는 DAG 파라미터 기능이 생겨 유연한 파라미터 설정이 가능해졌으며, 2.6에서는 새로운 UI 개선과 Trigger 기능이 강화되었습니다.
2024년 8월 15일에 Airflow는 2.10.0 버전을 릴리즈하였습니다.
데이터 관리를 위한 Data-aware Scheduling과 더 다양한 커넥터 지원이 추가되어 사용성이 크게 향상되었으며 Executor를 여러개를 정의해두고, 상황마다 적절한 Executor를 사용할 수 있도록 해주는 Hybrid Executor 기능이 추가되었습니다.
Airflow에서는 provider 모듈을 통해 여러 종류의 Executor를 지원합니다. 각각의 Executor는 컴퓨팅 효율과 실행 시간, 실행 환경 격리 등에 있어 저마다의 장단점을 가지고 있었습니다. 그리고 기존까지의 Airflow는 이중 하나의 Executor만 사용이 가능했습니다.
하지만 2.10.0버전 이후로 Hybrid Executor 기능을 통해 각 DAG내 Task들을 다른 Executor로 실행할 수 있도록 구성할 수 있게 되었습니다.
이번 글에서는 Hybrid Executor를 통해 DAG 내 Task들을 각각 다른 Executor로 실행하는 방법 및 사용 예시를 소개합니다.
구성 방법
Airflow의 설정 파일인 airflow.cfg
파일 내용중 core 섹션의 executor 의 값을 수정합니다.
|
기본 설정은 위와 같으며 모든 Task를 LocalExecutor로 실행하게됩니다.
|
위는 LocalExecutor와 CeleryExecutor 두 Executor를 사용하도록 정의한 설정 예시입니다.
(각 Executor는 콤마(,)로 구분하여 작성하며 이름은 실제로 Airflow 모듈 내 Executor 추상 클래스의 구현체 클래스명을 작성해야합니다.)
적용
dag.py
with DAG( dag_id="TEST-DAG", start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"), schedule=None, catchup=False, ) as dag: @task def task_local_exec(): ... @task(executor="CeleryExecutor") def task_celery_exec(): ...
task_local_exec >> task_celery_exec |
위에서 설정한 Hybrid Executor를 사용한 샘플 DAG 코드입니다.
|
위와 같이 설정했을 경우 맨 첫번쨰 Executor가 기본 Executor가 됩니다.
따라서 task_local_exec Task는 LocalExecutor에 의해 실행됩니다.
task_celery_exec Task는 파라미터에 executor="CeleryExecutor"
라고 정의했습니다.
따라서 airflow.cfg의 core.executor에 지정된 CeleryExecutor에 의해 실행됩니다.
사용 사례
|
apache-airflow-providers-amazon 모듈에서 제공하는 EcsExecutor를 커스텀하여 사용중인 AwsFargateExecutor와 LocalExecutor를 사용합니다. AwsFargateExecutor는 task를 각각 독립된 AWS Fargate 컨테이너에서 실행시키는 Executor입니다.
dag.py
import pendulum from datetime import timedelta from airflow import DAG from airflow.decorators import task from airflow.models import DagRun with DAG( dag_id="DAG-Sample", start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"), schedule=timedelta(minutes=1), catchup=False, default_args={ 'depends_on_past': False, 'retries': 0, }, ) as dag: @task(executor="LocalExecutor") def get_tasks(**kwargs): ... @task( max_active_tis_per_dag=100, executor_config={"overrides": {"cpu": "1024", "memory": "4096"}}, map_index_template="{{ mapping_map_index }}", ) def execute_task(**kwargs): ... tasks=get_tasks() execute_task.expand(task=tasks) |
위 DAG는 1분마다 실행하여 get_tasks를 통해 실행해야할 작업을 얻고 Dynamic Task인 executor_task를 통해 작업 개수만큼의 TaskInstance를 생성하여 작업을 실행하게됩니다.
execute_task Task는 여러 작업이 예약될 경우 병렬로 여러 작업을 동시에 처리하게되는데, 이때 TaskInstance마다 별도의 Fargate 컨테이너에서 작업을 실행하여 독립된 실행 환경을 보장하고 설정을 통해 손쉽게 컴퓨팅 용량을 조절할 수 있습니다.
반면 get_tasks는 AwsFargateExecutor로 실행될 경우 1분마다 Airflow 이미지를 pull해야하는데, 해당 이미지의 크기가 커서 많은 네트워크 트래픽이 발생하게 됩니다. 또한 해당 작업은 스토리지로부터 실행할 작업 목록을 가져오기만 하면 되는 Task이므로 LocalExecutor를 선택하였습니다.
아래 Airflow Release Note를 통해 더 많은 변경사항을 확인해보세요!
https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-10-0-2024-08-15