Airflow는 배치성으로 처리가 필요한 워크플로우를 개발하고 스케쥴링하고 모니터링하는 오케스트레이션 플랫폼이다. 스트리밍 솔루션이 아니기 때문에 계속 돌아가는 이벤트기반 워크플로우를 위해 설계되지 않았다. 보통은 Kafka를 통해서 실시간 데이터를 입수 및 처리해서 스토리지에 쌓고 Airflow가 주기적으로 데이터 처리를 시작한다.
이번 포스트에서는 Airflow의 기본 개념과 중요한 구성 요소들을 간단하게 소개한다. 스크립트 작성보다 기존에 사용하던 Airflow의 기본 철학을 이해하고 DAG를 구성하면서 사용했던 인자들을 복습하는 장이다.
Workflow
airflow에서는 워크플로우를 코드로 정의할 수 있다는 특징을 가진다. 이런 특성은 다음 목적을 위해 구현된다.
- Dynamic: 파이썬 코드로 파이프라인 정의
- Extensible: 오퍼레이터를 지원하여 모든 컴포넌트들은 쉽게 환경에 맞춰 다양한 기술스택을 활용할 수 있다.
- Flexible: jinja 템플릿 엔진이 내장되어 있어 워크플로우에서 필요한 파라미터 값을 자동으로 받을 수 있다.워크플로우가 명확한 시작, 종료 시점이 있고 주기적으로 실행되어야 하다면 Airflow DAG으로 처리할 수 있다.
공식 문서 overview 결말부에서는 코딩 없이 클릭만으로 데이터 플로우 처리, 관리를 하고싶은 경우에는 적합하지 않다고 당부한다. 해당 프레임워크에서는 지속적으로 개발자 편리를 높이는 방향으로 개선하지만 기본 철학은 항상 워크플로우는 코딩으로 정의하는 것으로 요구된다.
1. DAG 정의
하나의 스크립트로 정의된 workflow는 DAG (Directed Acyclic Graph)으로 파이프라인을 정의된다.
DAG 정의 파일에서 실질적인 데이터 처리 작업이 이루어지는 것은 아니고 스케쥴러에서 이 스크립트 변경 사항을 반영해 주기적으로 실행한다.
1). Arguments
다음은 DAG 정의에서 기본적으로 설정하는 파라미터들 값을 명시하여 태스크 실행에 필요한 값을 설정한다.
parameter | description | example | |
dag_id | DAG명 | dag_id='tutorial' | |
default_args | 각 오퍼레이터 태스크에 명시된 값 전달 | default_args = { 'start_date': dateimte(2024, 1, 1) 'retries': 3, 'retry_delay': timedelta(minutes=5), ... |
각 태스크가 해당 설정값을 상속받아 갖게 되며 특정 태스크에서는 다른 설정값이 필요하다면 override하면 된다. |
schedule | DAG 실행 주기 (과거 용어: |
schedule='0 9 * * *' -> 매일 9시 정각 실행 schedule='0 0 * * 0' -> 월요일 정각 실행 |
|
catchup | start_date~실행시점까지 지난 실행도 실행할건지 여부 | catchup=True | |
max_active_runs | 동시에 실행될 수 있는 dag_run 개수. 설정된 수를 초과하지 않도록 스케쥴러는 새로운 해당 dag_run을 생성하지 않느다. |
max_active_runs=2 (default: 16) | 만약 병렬로 실행되는 3개 태스크를 가지고 있는 DAG의 max_active_tasks=6인 상태에서 backfill이나 catchup시 max_active_runs=(max_active_tasks / tasks)+1로 설정하면 앞선 태스크가 끝나는 대로 다음 dag_run에서 태스크를 실행해두고 있을 수 있어 2로 설정했을때보다 시간을 단축할 수 있다. |
max_active_tasks | 동시에 실행될 수 있는 task 개수 (과거 용어: |
max_active_tasks=6 (default: 16) | max_active_tasks_per_dag |
on_failure_callback | dag_run 실행 실패시 호출 콜백 지정 | on_failure_callback=send_message |
이외에도 다양한 파라미터 설정이 지원된다. (공식문서)
2) default_args 파라미터
DAG 내 모든 태스크에 적용될 기본 인수들 딕셔너리 형태로 설정한다.
key | description | |
start_date | interval 반영되는 시작일 ⭐️ DAG first execution date = starte_date + interval |
|
end_date | DAG 종료일 이 날짜 이후에 DAG 실행이 예약되지 않는다. |
|
retries | 태스크 실패시 자동 재실행 횟수 | |
retry_delay | 재시도 간격 시간 | |
depends_on_past | 태스크가 자신의 직전 실행에 의존할지 여부
|
2. Task 정의
태스크를 정의하는 스타일은 크게 두 가지이다.
- Operator로 태스크를 생성하는 방식 (ex. PythonOperator, BashOperator, KubernetesPodOperator...)
- TaskFlow로 태스크를 생성하는 방식 (ex. @task, )
태스크 오퍼레이터에서는 다음 우선순위로 파라미터 값을 가져온다.
- 태스크 선언에서 명시적으로 전달된 인자값
- DAG 선언에서 default_args 딕셔너리에 있는 값
- 오퍼레이터 기본 값
태스크에서는 무조건 task_id, owner 값을 1, 2번 방식으로 전달 받아야 exception이 발생하지 않는다.
3. Data Interval
DAG의 실행은 개념적으로 특정일과 시간을 나타내는것이 아니라 data interval이라고 불리는 두 시간간의 간격을 나타낸다. (원문)
Airflow에서 각 DAG 실행에는 작업하는 시간 범위를 나타내는 data interval이 할당된다. 일반적으로 관련 data interval이 종료된 후 실행이 예약된다. 공식 문서에서는 해당 주기(hourly, daily, weekly, monthly, yearly...) 기간 내 모든 데이터를 수집할 수 있도록 보장하기 위해서 이 데이터 간격이 확보하기 위한것으로 설명하고 있다.
예시로 2023-01-01 데이터를 커버하는 실행은 2023-01-01 00시에 실행되는 것이 아니라 해당 일자가 종료된 후, 즉 2023-01-02 00:00:00 이후에 시작되는 것이다.
Airflow에서 쓰이는 모든 날짜 개념(logical_date, start_date...)은 어떻게로든 data interval 개념과 연결되어 있다.
이중 아래 설명하고 있는 logical_date는 바로 이 data interval의 시작지점을 나타낸다. logical_date가 DAG가 실행되는 시점을 나타내는 것이 아니다!!!
마찬가지로 start_date로 설정된 날짜도 처음 실행되는 dag_run의 logical_date를 가리키는 것이다. 따라서 DAG 실행은 start_date로 명시된 날짜 이후 1 data interval이 지나야 예약된다!!!
DAG Runs — Airflow Documentation
airflow.apache.org
3-1. logical_date
💡 DAG 실행에서 잡히는 logical_date는 data interval의 시작이다.
실제로 DAG가 물리적으로 실행되는 것은 '지금'일지라도 logical_date는 스케쥴러가 특정일자와 시간으로 태스크를 실행되고 있는 것으로 시뮬레이션한다🤖.
그래서 물리적으로(physically) 실행되는 시점과 다른 개념 이름을 쓰기 위해서 logical_date라는 이름으로 정립된거 같다. 초반 버전에는 execution=logical로 혼용되어서 정말 헷갈렸다. (Airflow 2.2 버전 이후부터는 execute_date용어는 logical_date로 대체되었다.) 협업할때 시점에 대해서 여러 용어를 섞어서 말하고 그와중에 헷갈리기도 해서 팀원분의 혼란스러워했던 모습에 책임이 느껴졌다...
We said the scheduler runs your task "for" a specific date and time, not "at".
스케쥴러가 특정일 특정 시간 "동안" 태스크를 실행하는 개념이며 특정 시간"에" 태스크를 실행하는 것이 아니다.
(사용자 입장에서는 특정 시간에 태스크가 돈다고 보는게 더 간단하지 않나..? 왜...왜이렇게 하는거지... 크론잡과 다르다는 건가...)
To Be Continue...
데이터 파이프라인 작업에서 Airflow는 툴같은 느낌이었다. 그런데 필요한 기능만 그때그때 참고해서 쓰다보니 어느시점부터 logical_date같은 간단해보이는 개념에서 자꾸 막혔다. 용어도 버전업되면서 더이상 사용되지 않는 파라미터에 값이 들어가거나... '알고 있다고 생각한' 부분에서 삐그덕하니 이번 기회에 Airflow가 어떤 의도로 파라미터들을 지원하는지부터 볼 필요가 있었다. 이번 포스트에서는 그동안 당연하게 생각한 개념에 대해서 짧게 소개하였고 다음에 위 개념들에서 타고 들어가서 소스 코드나 실습, 심화 개념에 대해서 다룰 예정이다.
'DataEngineering' 카테고리의 다른 글
Spark tuning (0) | 2024.03.17 |
---|---|
[Spark] Spark run in Cluster Mode-YARN (1) | 2024.02.04 |
[Airflow] Operators (0) | 2024.01.21 |
MapReduce (0) | 2023.01.07 |
Hadoop (0) | 2023.01.07 |