Airflow Dynamic Task Mapping: 반복 작업을 동적으로 처리하는 법
데이터 엔지니어링 파이프라인을 구축하다 보면, 동일한 작업을 여러 입력값에 대해 반복적으로 실행해야 하는 경우가 많습니다. 예를 들어, 매일 처리해야하는 태스크 숫자가 바뀌거나, 외부 트리거로 인해 실행 되었을때 실행할 태스크 숫자가 동적일 경우가 있습니다.
이전에는 이러한 작업을 위해 for
루프를 사용하여 태스크 내부에서 처리하거나 , 고정된 숫자만큼의 태스크를 실행했었죠. 이 방식은 DAG의 복잡성을 높이고 유연성을 떨어뜨리는 단점이 있었습니다.
Airflow 2.3부터 도입된 동적 태스크 매핑(Dynamic Task Mapping) 기능은 이러한 문제를 우아하게 해결합니다. 이 기능을 사용하면 DAG 실행 시점에 동적으로 태스크를 생성하여, 가변적인 입력에 대응하는 유연하고 효율적인 파이프라인을 구축할 수 있습니다.
Dynamic Task Mapping이란?
동적 태스크 매핑은 간단히 말해, 하나의 태스크 정의를 사용하여 런타임에 여러 개의 태스크 인스턴스를 생성하는 기능입니다. 마치 map
함수처럼 입력 리스트나 딕셔너리의 각 항목에 대해 동일한 태스크를 병렬로 실행할 수 있게 해주는 것이죠.
기존의 정적 for
루프 방식과 가장 큰 차이점은 태스크 생성 시점입니다. 정적 방식은 DAG 파일을 파싱하는 시점에 모든 태스크를 미리 생성하지만, 동적 매핑은 실제 DAG가 실행되는 시점(런타임)에 필요한 만큼만 태스크를 생성합니다. 이로 인해 다음과 같은 장점을 얻을 수 있습니다.
-
유연성 향상: 실행 시점의 데이터나 환경에 따라 처리해야 할 작업의 수가 달라지는 경우에 매우 유용합니다. 예를 들어, S3 버킷에 새로 추가된 파일의 개수만큼 동적으로 처리 태스크를 생성할 수 있습니다.
-
DAG 가독성 및 유지보수성 증가:
for
루프로 인해 지저분해졌던 DAG 코드가 훨씬 간결하고 명확해집니다. -
성능 최적화: 스케줄러의 부담을 줄여줍니다. DAG 파싱 시간에는 하나의 태스크만 인식하고, 실행 시에만 여러 인스턴스로 확장되므로 대규모 병렬 처리에 더 효율적입니다.
핵심 기능: expand()
와 partial()
동적 태스크 매핑은 주로 @task
데코레이터와 함께 사용되는 expand()
함수를 통해 구현됩니다.
expand()
: 기본 동적 매핑
expand()
함수는 리스트나 딕셔너리와 같은 반복 가능한(iterable) 객체를 인자로 받아, 각 요소에 대한 태스크를 생성합니다.
가장 기본적인 예시를 살펴봅시다. 여러 숫자에 각각 1을 더하는 작업을 동적으로 처리하는 DAG입니다.
from __future__ import annotations import pendulum from airflow.decorators import dag, task @dag( dag_id=”dynamic_task_mapping_basic_example”, start_date=pendulum.datetime(2025, 1, 1, tz=”Asia/Seoul”), schedule=None, catchup=False, ) def dynamic_task_mapping_basic_example(): @task def add_one(x: int): return x + 1 @task def summarize(values: list): total = sum(values) print(f”Total was {total}”) added_values = add_one.expand(x=[1, 2, 3]) summarize(added_values) dynamic_task_mapping_basic_example() |
위 코드에서 add_one.expand(x=[1, 2, 3])
부분은 add_one(x=1)
, add_one(x=2)
, add_one(x=3)
이라는 세 개의 태스크 인스턴스를 런타임에 생성합니다. Airflow UI에서는 이들이 매핑된(mapped) 태스크로 표시되며, 각 인스턴스의 로그와 성공 여부를 개별적으로 확인할 수 있습니다.
partial()
: 고정 인자와 함께 사용하기
만약 여러 태스크에 동일한 인자를 전달하면서 특정 인자만 동적으로 변경하고 싶다면 partial()
함수를 함께 사용할 수 있습니다.
from __future__ import annotations import pendulum from airflow.decorators import dag, task @dag( dag_id=“dynamic_task_mapping_with_partial_example, start_date=pendulum.datetime(2025, 1, 1, tz=“Asia/Seoul”), schedule=None, catchup=False, ) def dynamic_task_mapping_with_partial_example():
@task def multiply(x: int, y: int): return x * y
multiplied_values = multiply.partial(y=10).expand(x=[2, 3, 5])
dynamic_task_mapping_with_partial_example() |
partial(y=10)
을 통해 y
라는 인자는 10
으로 고정되고, expand(x=[2, 3, 5])
를 통해 x
값이 동적으로 매핑되어 태스크가 실행됩니다.
Airflow의 동적 태스크 매핑은 현대 데이터 파이프라인의 복잡성과 가변성에 대응하기 위한 필수적인 기능입니다. 반복적인 작업을 간결하고 효율적으로 처리할 수 있게 함으로써, 데이터 엔지니어가 더 중요한 비즈니스 로직에 집중할 수 있도록 돕습니다.
아직 동적 태스크 매핑을 사용해보지 않았다면, 꼭 한번 활용해 보시길 바랍니다. 복잡한 for
루프에서 벗어나 훨씬 깔끔하고 강력한 데이터 파이프라인을 구축하는 경험을 하게 될 것입니다.