하나의 오퍼레이터는 Airflow에서 완수해야하는 하나의 기본 단위이다.
이번 포스트에서는 가장 많이 쓰이는 오퍼레이터인 BashOperator, PythonOperator, KubernetesPodOperator 등 모든 오퍼레이터가 상속받는 BaseOperator 내용을 보면서 필수 인자와 각 오퍼레이터별 별도 필요 인자를 살펴본다.
BaseOperator
Abstract base class for all operators.
모든 오퍼레이터들을 위한 추상 클래스
오퍼레이터들은 DAG 내의 노드가 되는 객체들을 생성하기 때문에, BaseOperator는 DAG 크롤링 행동을 위한 많은 재귀 메서드들을 포함하고 있다. 이 클래스에서 파생되기 위해서는, 생성자와 'execute' 메서드를 재정의하면 된다. BaseOperator 클래스로부터 파생된 오퍼레이터는 특정 태스크를 동기적으로 수행하거나 트리거한다. 이 클래스에서 가지고 있는 파라미터는 모든 오퍼레이터 클래스에서 명시되어 있지 않아도 상속받아 기본으로 쓰인다고 볼 수 있다.
parameter | type | description | example |
task_id | str | 태스크 고유 이름. | load_data |
owner | str | 태스크 수행 책임자 | team/user/role name 권장 |
str | Iterable[str] | None | 수신 이메일 | ||
email_on_retry | bool | 재실행시 이메일 발신 여부 | |
email_on_failure | bool | 실패시 이메일 발신 여부 | |
retries | int | None | 실패시 자동 재실행 횟수 | |
retry_delay | datetime.timedelta | float | 실패시 재실행 간격 | timedelta(seconds=300) |
retry_exponential_backoff | bool | 재실행 횟수가 늘어날 수록 재실행 간격을 늘리는 여부 | |
max_retry_delay | 최대 재실행 간격 | ||
start_date | datetime.datetime | None | 태스크 인스턴스 최초 수행 일자. dag의 schedule_interval에 맞는 최신 일시를 권장한다. | dag 차원에서만 start_date를 설정했는데 특정 날짜 이전에 해당 태스크는 실행되지 않도록 하는 옵션 차원으로도 사용할 수 있는지 확인 필요 |
end_date | datetime.datetime | None | 설정된 날짜 이후에는 태스크가 실행되지 않는다 | |
depends_on_past | bool | 이 설정이 활성화되어 있으면 태스크는 이전 실행이 성공하거나 스킵된 상태일때만 실행될 수 있다 | |
wait_for_past_depends_before_skipping | bool | depends_on_past값과 함께 설정된다면 앞선 의존성 태스크 실행이 실행되는 동안 대기하고 있는다 | |
wait_for_downstream | bool | 해당 태스크에 대한 의존성 있는 태스크들이 모두 완료될때까지 대기한다. 앞선 dag_run완료에 대한 의존성이 있는 경우 유용하다. depends_on_past 값도 true로 같이 설정되는 점 주의 필요하다. | |
dag | airflow.models.dag.DAG | None | 태스크가 붙어있는 dag 명시 | |
priority_weight | int | 큰 수를 설정할수록 다른 태스크보다 우선순위를 높일 수 있다.작업이 밀려있을 때 executor가 우선순위가 높은 태스크부터 트리거 한다. | |
weight_rule | str (default: downstream) | downstream으로 설정되어있을때 태스크는 자신 밑으로 연결되어 있는 downstream 태스크 수 총합으로 priority_weight가 설정된다. | |
queue | str | 특정 큐를 지정하는 옵션 celeryExecutor에서는 지원되지 않는다. | |
pool | str | None | 해당 태스크 실행에 동시성 제한하는 방법 설정 | |
pool_slots | int | 해당 태스크가 사용하는 pool stot 개수. 1보다 큰 값으로 설정되어야한다. | |
sla | datetime.timedelta | None | 작업이 성공해야할 예상 시각 | |
execution_timeout | datetime.timedelta | None | 최대 허용 실행시간 | |
on_failure_callback | None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback] | 실패시 실행되는 콜백 지정. context 딕셔너리가 유일한 인자로 전달된다. | |
on_execute_callback | 태스크 실행 직전에 수행되는 콜백 | ||
on_retry_callback | 재실행시 수행되는 콜백 | ||
on_success_callback | 성공시 수행되는 콜백 | ||
pre_execute | 태스크 수행 직전에 호출되는 함수이며 context 딕셔너리를 받는다. 해당 함수에서 조건을 만족하지 못한다면 태스크가 실행되는 것을 방지할 수 있다. | ||
post_execute | 태스크가 완료된 직후 호출되는 함수. | ||
trigger_rule | str (default: all_success) | 트리거 받을때 의존성 있는 태스크 상태 설정 | |
resources | dict[str, Any] | None | 리소스 인자 이름-값 | |
run_as_user | str | None | 작업이 실행되는 동안 사용할 unix username 설정 | |
max_active_tis_per_dag | int | None | 동시 실행 태스크 수 제한 | |
max_active_tis_per_dagrun | dag_run에서 동시 실행 태스크 수 제한 | ||
executor_config | 특정 executor가 전달받는 추가적인 태스크 레벨의 설정값 | ||
do_xcom_push | bool | Xcom에 오퍼레이터 결과 포함 | |
task_group | 태스크가 소속되어 있는 태스크그룹 명시 | ||
logger_name |
많은 파라미터들이 있고 설정하지 않을 시 기본으로 설정된 값으로 실행되기 때문에 크게 신경쓰지 않았던 값이 많았다. 하지만 이렇게 파라미터를 한번 정리하면서 무신경하게 쓰던 dag에서도 좀 더 쉽게 원하는 경우에만 태스크를 실행하게 하거나 우선순위를 조절할 수 있는 파라미터를 확인할 수 있었다. 지금 생각해보니 최대한 오퍼레이터 레벨에서 설정할 수 있는 부분이 많았다. 이중 task weight, pool을 어떨 때 사용할 수 있는지 생각해보고 정리하자면 다음과 같다.
Task weight
weight_rule의 경우 데이터 파이프라인에서 많은 dag을 돌리고 backfill을 하다보면 순서가 상관은 없지만 크기가 크거나 필요가 큰 데이터는 executor나 scheduler에서 먼저 큐에 들어가야될거 같은 상황이 있다. 그럴때 계속 대기 상태이거나 동시에 너무 돌리는 태스크가 많을때 우선순위에서 밀리는 부분을 볼때 어떤 설정을 더 해줘야했는지 막힐때가 있었다.
- weight_rule
- downstream: downstream 태스크 수 총합으로 priority_weight가 설정된다.
- upstream: upstream 태스크 수 총합으로 설정
- absolute: 의존하고 있는 노드수 같은 추가 가중치 없이 설정한 priority_weight 그대로 쓰인다. 각 작업이 가져야할 우선 순위가 복잡하지 않고 관리할 수 있는 규칙이 있다면 이 설정을 활용할 수 있다. 부가적인 효과로 매운 큰 DAG에 대한 태스크 생성 과정이 상당히 빨라질 수 있다고 한다.
Pool
동시에 실행되는 태스크가 많을때 개수를 제어하지 못해서 부하로 터지는 것을 방지하기 위해서 Pool을 설정하여 동시 태스크를 제한한다. 특히 airflow 시스템에 부하도 있지만 spark 자원을 많이 사용하는 경우에도 배치 작업을 제어하는 오케스트레이션 단에서 이를 관리해야 할 필요가 있다. 이때 Pool에 들어간 작업들에 대한 실행에는 task_weight로 설정된 우선순위가 반영된다.
참고 https://wookiist.dev/171
BashOperator
bash 환경에서 커맨드를 실행하는 bashoperator는 baseoperator를 상속받기 때문에 위 파라미터 값을 설정할 수 있고 추가로 해당 작업 특성에 맞는 파라미터를 따로 갖는다. 클래스 내부 코드를 보면 인스턴스 생성하는 __init__() 단계에서 우선 super().__init__(**kwargs) 로 부모 메서드를 호출하여 상속받는 파라미터들을 갖는다.
해당 태스크에서 커맨드 결과가 0으로 종료되는 경우 Airflow에서는 기본적으로 성공을 처리하고 이외의 값으로 종료될 시 skip_on_exit_code를 설정하지 않는한 실패로 처리한다.
parameter | type | description | example |
bash_command | str | bash 커맨드 | |
env | dict[str, str] | None | 환경변수 | |
append_env | bool (default: False) | 실행되는 환경의 환경변수 상속 여부 | |
output_encoding | |||
skip_on_exit_code | int | Container[int] | None (default: 99) | exit 코드로 태스크가 나오개 되면 skipp 상태로 둔다. None으로 해당 설정을 하지 않으면 0이 아닌 코드로 끝난 작업은 실패로 처리된다. | |
cwd | str | None | 커맨드가 실행되는 디렉토리 경로 설정. 설정되지 않았다면 커맨드는 temp 경로에서 실행된다. |
PythonOperator
airflow.operators.python — Airflow Documentation
airflow.apache.org
parameter | type | description | example |
python_callable | Callable | 파이썬 함수 지정 | |
op_kwargs | Mapping[str, Any] | None | 파이썬 함수에 전달할 딕셔너리 인자 | { 'name': 'bmo' } |
op_args | Collection[Any] | None | 파이썬 함수에 전달할 리스트 인자 | ['bmo'] |
templates_dict | dict[str, Any] | None | Ariflow 엔진에서 자체적으로 세팅된 딕셔너리 값 | 'task_instance', 'run_id'... |
show_return_value_in_logs | bool (default: True) | 반환값 조회 가능 여부 큰 데이터를 반환하는 함수를 사용하는 경우 False로 설정하는 것이 Xcom으로 인한 낭비를 줄일 수 있다. |
ShortCircuitOperator
pythonOperator를 상속받아서 태스크 차원에서의 '조건'을 체크하는 태스크를 구현할 수 있다. ShortCircuitOperator로 구현된 태스크에서 python_callable로 지정된 함수의 반환값이 True라면 그 뒤 태스크들로 넘어가고 False라면 해당 태스크의 downstream 태스크들은 모두 skip된다. 이 로직은 위에 BaseOperator에서 소개된 trigger_rule을 유지하거나 무시하도록 설정할수도 있다.
parameter | type | description | example |
ignore_downstream_trigger_rulse | bool (default: True) | True인 경우 downstream 태스크들이 설정된 trigger_rule 상관없이 모두 skip된다. False인 경우 직속으로 연결된 후속 태스크 이외에는 설정된 tirgger_rule에 따라 실행될 수 있다. |
내부 로직 코드를 봤을때 실행 메서드(execute)에서 상속받는 PythonOperator의 execute대로 실행된 결과가 condition이라는 이름의 변수에 저장된다. 그후 ignore_downstream_trigger_rulse 값에 따라 후속 태스크의 skip 여부를 설정한다.
Continue...
보통은 태스크 로직은 개별 파일로 저장하고 이를 실행하는 커맨드를 오퍼레이터에 전달해서 처리하는 식으로 많이 활용하고 있다. 그외에 비즈니스 로직이나 태스크간 관계, DAG로 분리된 로직간 관계가 점점 커지는 느낌인데... DAG 정의에서 설정해주는 파라미터와 개별로 설정할수 있는 태스크 파라미터를 잘 조합해서 최대한 관리 리소스를 줄이고 잘 관리되는 배치 작업을 구현하고 싶다. 다음 포스트에서는 위에서 소개한 파라미터를 사용했을때 확인되는 결과를 공유하거나 또다른 오퍼레이터를 소개할거 같다.
'DataEngineering' 카테고리의 다른 글
Spark tuning (0) | 2024.03.17 |
---|---|
[Spark] Spark run in Cluster Mode-YARN (1) | 2024.02.04 |
Airflow Concept (0) | 2023.12.24 |
MapReduce (0) | 2023.01.07 |
Hadoop (0) | 2023.01.07 |