Apache Airflow의 AWS ECS Executor로 워크플로우 실행하기
지난 포스팅에서 에어플로우의 실행기중 하나인 AWS ECS Executor에 대해 소개드렸습니다.
무거운 라이브러리를 사용하거나, 수 시간이 걸리는 대규모 ETL 작업을 처리해야 할 때, ECS Fargate 기반의 워커는 자원의 제약으로부터 비교적 자유롭고 독립된 작업 실행 환경을 제공합니다.
하지만 자유도가 높은 만큼 구현 과정은 조금 더 세밀한 설정을 요구합니다. ECS 클러스터를 설계하고, 적절한 태스크 정의를 정의하며 Airflow 스케줄러가 ECS API를 사용하여 클러스터를 원활하게 통제할 수 있도록 보안 그룹 및 IAM 역할을 설정해야합니다.
이번 포스팅에선 ECS Executor의 실전 구축에 집중합니다. ECS Executor를 도입하며 겪었던 시행착오들을 바탕으로, 인프라 프로비저닝부터 도커 이미지 빌드, 에어플로우 설정 최적화 등의 가이드라인을 제시합니다.
AWS 인프라 및 권한 설정
ECS Executor를 사용하는 Airflow 스케줄러의 IAM 역할에 다음 권한을 추가해야합니다.
- ecs:RunTask
- ecs:DescribeTasks
- ecs:StopTask
- iam:PassRole
Fargate는 인터넷 또는 AWS ECR로부터 이미지를 가져오거나 DB에 접속할 수 있어야 합니다.
이 포스팅에서는 AWS ECR에 도커 이미지를 구성하는 방법으로 구현합니다.
AWS 관리 콘솔에서 다음을 생성합니다.
- ECR 리포지토리 생성
- ECS 클러스터 생성
에어플로우 설정 및 이미지 빌드
AIRFLOW__CORE__EXECUTOR 에 airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor 추가(apache-airflow-providers-amazon 패키지 설치 필요)
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
– 메타 DB 연결 (postgresql+psycopg2://<username>:<password>@<endpoint>/<database_name>)
AIRFLOW__AWS_ECS_EXECUTOR__CLUSTER
– ECS 클러스터 이름
AIRFLOW__AWS_ECS_EXECUTOR__REGION_NAME
– ECS 클러스터 리전명
AIRFLOW__AWS_ECS_EXECUTOR__TASK_DEFINITION
– 태스크 정의 이름
AIRFLOW__AWS_ECS_EXECUTOR__LAUNCH_TYPE
– 태스크 실행 타입, ‘FARGATE’로 사용
AIRFLOW__AWS_ECS_EXECUTOR__PLATFORM_VERSION
– 태스크 정의 버전, ‘LATEST’ 사용
AIRFLOW__AWS_ECS_EXECUTOR__SUBNETS
– 태스크 Fargate 컨테이너가 올라갈 서브넷 ID 목록, ‘,’로 구분하여 입력
AIRFLOW__AWS_ECS_EXECUTOR__SECURITY_GROUPS
– 태스크에 할당할 보안 그룹 ID 목록, ‘,’로 구분하여 입력
AIRFLOW__AWS_ECS_EXECUTOR__ASSIGN_PUBLIC_IP
– 태스크에 퍼블릭 IP 사용 여부, 프라이빗 서브넷의 경우 False
기본적으로 이미지는 스케줄러가 사용하는 이미지와 동일하게 빌드합니다.
예제 Dockerfile 에서 Dockerfile 예시를 확인할 수 있습니다.
ECS 태스크 정의 생성
ECR에 도커 이미지를 빌드했다면 AWS ECS 관리 콘솔에서 태스크 정의를 생성합니다.
시작 유형은 AWS Fargate만 선택하고 알맞은 아키텍처를 선택한 후 워크로드에 따라 태스크 역할(태스크가 할당받는 IAM 역할)과 태스크 실행 역할(태스크를 시작하기위해 필요한 역할, Executor가 필요한 역할)을 선택합니다.
필수 컨테이너 하나에 ECR에 푸시된 에어플로우 이미지를 선택합니다. 필요하다면 추가로 환경 변수 또는 볼륨을 추가합니다.
DAG 작성 및 실행
import pendulum
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="FARGATE-DAG",
tags=["FARGATE"],
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
schedule=None,
catchup=False,
default_args={
'owner': 'user',
'depends_on_past': False,
'retries': 0,
},
) as dag:
@task(
executor="airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor",
)
def exec_task(**kwargs):
print("Task execution")
result = exec_task()
executor 파라미터를 명시하여 해당 Executor를 통해 태스크를 실행시킬 수 있습니다.
지금까지 총 2편에 걸쳐 AWS Lambda와 AWS ECS Executor라는 두 가지 강력한 무기를 모두 살펴보았습니다.
가벼운 API 호출이나 단순 변환은 Lambda Executor로 가성비를 챙기고, 묵직한 ETL과 연산 작업은 오늘 배운 ECS Executor로 안정성을 확보하는 **’하이브리드 아키텍처’**를 구성해 보세요.
인프라를 관리하는 시간은 줄이고, 데이터를 분석하고 가치를 만드는 시간은 늘리는 것. 그것이 바로 우리가 오케스트레이션 툴을 사용하는 궁극적인 이유일 것입니다. 오늘 이 포스팅이 여러분의 워크플로우를 한 단계 더 진화시키는 계기가 되었기를 바랍니다.