[Spark] Spark run in Cluster Mode-YARN

이전까지 spark 스크립트를 배치로 돌리면서 클러스터에서 spark-submit커맨드를 보내기 급급했는데 어플리케이션이 클러스터에서 어떻게 실행되는지 설명할 수 있어야겠다고 생각이 들었습니다. 이번 포스트에서는 spark-submit으로 클러스터에 띄운 어플리케이션이 어떻게 실행되는지 공식 문서 내용을 정리합니다.

 

용어 설명
Task executor에 전달되는 작업 단위
Job Spark action에 대한 응답으로 생성되는 여러 태스크로 구성된 병렬 연산. 각각의 작업을 실행하여 전체적인 처리 과정을 분산시킵니다.
Stage 개별의 job은 stage라고 불리는 소규모 태스크 세트로 분할됩니다. 
Application Spark 스크립트로 빌드된 프로그램. 클러스터에 driver와 executor로 구성됩니다. SparkContext instance
driver program 어플리케이션의 main() 함수를 실행하는 프로세스. SparkContext를 생성합니다.
Cluster 아파치 스파크에서 "클러스터"는 상호 연결된 컴퓨터들의 네트워크를 의미합니다. 이 클러스터는 여러 대의 컴퓨터가 자원을 공유하며 분산 컴퓨팅 작업을 수행합니다. 스파크 클러스터에서는 각기 다른 노드(컴퓨터)가 CPU와 메모리 같은 컴퓨팅 자원을 활용하여 대규모 데이터 세트를 처리합니다. 이렇게 병렬 처리를 통해 데이터 처리와 분석 속도를 크게 향상시킬 수 있으며, 이는 대용량 데이터를 다루는데 이상적입니다.
Cluster manager 클러스터에 자원을 가져오는 외부 서비스 (Memos, YARN, Kubernetes...)
Worker node 클러스터 내에서 어플리케이션 코드를 실행시킬 수 있는 노드(컴퓨터)
Executor 워커 노드에 실행되는 프로세스로 태스크를 수행하고 태스크 전반에 데이터를 메모리나 디스크 저장소에 저장합니다. 

Compoenets

Spark는 여러 모듈로 구성되어 있습니다. 크게 두 부분으로 나누어 보면, 컴퓨터 Cluster의 리소스를 관리하는 Cluster Manager와 그 위에서 동작하는 사용자 프로그램인 Spark Application으로 구분됩니다. [클러스터 리소스 최적화를 위한 Spark 아키텍처 ①]

 

먼저 스파크 어플리케이션은 클러스터 위에서 개별적인 프로세스로 실행됩니다. 드라이버 프로그램은 SparkContext를 생성하고 SparkContext는 클러스터를 조율하고 여러 종류의 클러스터 매니저와 연결할 수 있습니다. 일단 연결되면 클러스터 노드에 executor를 확보합니다. 이 executor는 어플리케이션을 위한 연산과 데이터 저장을 실행합니다. 그 다음 executor는 SparkContext에 jar/python 파일로 정의된 어플리케이션 코드를 전달합니다. 끝으로 SparkContext는 executor들에 태스크를 실행하도록 보냅니다. 마지막으로 어플리케이션이 종료될때 SparkContext에서 자원을 해제해 모든 executor 프로세스 작업이 끝납니다. 

 

components


각각의 어플리케이션은 개별적인 executor 프로세스를 갖습니다. 이를 통해서 어플리케이션 전반동안 유지되며 태스크들을 멀티 쓰레드로 실행시킵니다. 이 과정은 스케쥴링, 실행 측면에서 각 어플리케이션들을 격리시키는 이점이 있습니다. 그런데 이 점은 외부 저장 시스템을 사용하지 않는 한 spark 어플리케이션간 데이터가 공유될 수 없다는 것을 뜻합니다.
스파크는 기반 클러스터 관리자에 대해 구체적인 요구사항이 없습니다. 스파크는 실행 프로세스를 확보할 수 있고, 이 프로세스들이 서로 통신할 수 있다면, 다른 애플리케이션을 지원하는 클러스터 관리자에서도 상대적으로 쉽게 실행할 수 있습니다.
드라이버 프로그램은 자신의 수명 동안 executors로부터 들어오는 연결을 계속해서 수신하고 수락해야 합니다.
드라이버는 클러스터에서 작업을 스케줄링하기 때문에, 작업 노드에 가깝게 실행되는 것이 좋으며, 가급적이면 동일한 로컬 영역 네트워크상에서 실행하는 것이 바람직합니다. 클러스터에 원격으로 요청을 보내고 싶다면, 드라이버에 RPC를 열어 작업 노드에서 멀리 떨어진 곳에서 드라이버를 실행하는 것보다는 드라이버가 가까운 곳에서 작업을 제출하도록 하는 것이 더 나을 수 있습니다.

 

Cluster Manager 

YARN

Yet Anaother Resource Nagotiator

하둡 클러스터 자원 관리 시스템. 

 

YARN의 기본 아이디어는 리소스 관리 기능과 job 스케쥴링-모니터링 기능을 분리된 데몬으로 나누는 것이었습니다. 이렇게 명시된 이유를 통해서 YARN 등장 이전에는 리소스관리 기능, 스케쥴링-모니터링 기능이 하나의 인스턴스로 처리되다가 어려움이 있었다는 것을 알 수 있습니다. 전역적인 ResourceManager가 있고 어플리케이션 당 ApplicationManager가 있는 구조입니다. 

 

ResourceMnager는 다음 2가지 컴포넌트를 가집니다. 

  • Scheduler
    스케줄러는 애플리케이션에 리소스를 할당하는 역할을 담당하고 용량, 큐 등의 일반적인 제약 조건을 고려합니다. 스케줄러는 정말 순수 스케쥴러로 애플리케이션의 상태를 모니터링하거나 추적하지 않습니다. 애플리케이션 실패나 하드웨어 고장으로 인한 작업 실패 재시작도 보장하지 않습니다. 스케줄러는 메모리, CPU, 디스크, 네트워크 등을 포함하는 추상적인 '리소스 컨테이너' 개념을 바탕으로 애플리케이션의 리소스 요구 사항에 따라 스케줄링 기능을 수행합니다. 스케줄러는 클러스터 리소스를 다양한 큐와 애플리케이션들 사이에 나누는 정책을 교체 가능한 구조로 가지고 있습니다.
  • ApplicationManager
    ApplicationsManager는 작업 제출을 수락하고, 애플리케이션별 ApplicationMaster를 실행하기 위한 첫 번째 컨테이너를 협상하며, 실패 시 ApplicationMaster 컨테이너를 재시작하는 서비스를 제공합니다. 각 애플리케이션의 ApplicationMaster는 스케줄러로부터 적절한 리소스 컨테이너를 협상하고, 그 상태를 추적하며 진행 상황을 모니터링을 담당합니다.
spark-submit --deploy-mode cluster --queue queue_name {script_file.py} {args: --nums-executors=, --executor-cores=, --executor-memory, --conf=}

Running Spark on YARN

YARN에서 Spark 애플리케이션을 실행하기 위해 두 가지 배포 모드를 사용할 수 있습니다. cluster 모드에서는 드라이버가 클러스터상의 YARN에 의해 관리되는 애플리케이션 마스터 프로세스 내에서 실행되며, 클라이언트는 애플리케이션을 시작한 후 종료될 수 있습니다. client 모드에서는 드라이버가 클라이언트 프로세스에서 실행되며, 애플리케이션 마스터는 YARN에서 자원을 요청하는 데만 사용됩니다.

스파크에서 다른 클러스터 관리자들과 달리, YARN 모드에서는 ResourceManager의 주소가 하둡 구성을 통해 설정됩니다. 따라서 YARN 모드에서 `--master` 매개변수는 'yarn'으로 지정됩니다. 이는 스파크가 YARN 환경에서 작동할 때 하둡의 설정을 기반으로 자동으로 ResourceManager를 식별한다는 것을 의미합니다.

spark-submit을 실행하면 클라이언트는 기본 애플리케이션 마스터를 시작하고, 그 후 애플리케이션 마스터의 자식 스레드로 실행됩니다. 클라이언트는 애플리케이션 마스터의 상태 업데이트를 주기적으로 폴링하고 콘솔에 표시합니다. 애플리케이션이 실행을 마치면 클라이언트는 종료됩니다.

YARN은 사용자 정의 리소스 유형을 지원하며, GPU를 위한 내장 유형을 가지고 있습니다. 스파크에서 GPU 등 리소스를 사용하는 경우, `spark.{driver/executor}.resource.*` 구성만 지정하면 스파크가 이를 YARN 리소스 요청으로 변환할 수 있습니다.

YARN은 스파크에 각 컨테이너에 할당된 리소스의 주소를 알려주지 않기 때문에 사용자는 executor를 시작할 때 사용 가능한 리소스를 찾을 수 있도록 실행하는 스크립트를 지정해야 합니다. 이 스크립트는 실행 권한을 설정해야 하며, 악의적인 사용자가 수정하지 못하도록 권한을 설정해야 합니다.

Stage Level Scheduling

위 용어에서 정의한 것처럼 stage는 개별의 job이 소규모 태스크 세트로 분할된 단위입니다. YARN에서 스테이지 레벨 스케줄링은 동적 할당이 활성화되어 있을 때 지원됩니다. 각 ResourceProfile이 YARN에서 다른 컨테이너 우선 순위를 요구한다는 점에 주의해야합니다. YARN에서 먼저 생성된 프로필이 더 높은 우선 순위를 갖게 되므로 매핑은 간단하게 ResourceProfile ID가 낮은 숫자일수록 더 높은 우선 순위입니다. 큐의 앞쪽에 있는 작업이 전체 클러스터의 리소스를 모두 사용하지 않는 경우, 이후의 작업은 즉시 시작될 수 있습니다. 그러나 큐의 앞쪽에 있는 작업이 크고 상당한 리소스를 소비하는 경우, 나중의 작업은 상당한 지연을 경험할 수 있습니다. 

 

또한 기본 기본 프로필과 사용자 정의 ResourceProfiles 간에 사용자 정의 리소스가 처리되는 방식에 차이가 있습니다. 사용자가 스파크가 해당 리소스에 스케줄링하지 않고도 YARN 컨테이너에 추가 리소스를 요청할 수 있도록 하려면, 사용자는`spark.yarn.executor.resource.*` 설정을 통해 리소스를 지정할 수 있습니다. 근데 이 설정들은 기본 기본 프로필에서만 사용되고 특정 스테이지에서 사용자 정의 리소스를 제거하고 싶을 때 제거할 방법이 없을 수 있어서 이를 방지하기 위해... 다른 사용자 정의 ResourceProfiles로 전파되지 않습니다. 이로 인해 기본 리소스 프로필은 `spark.yarn.executor.resource.*`에서 정의된 사용자 정의 리소스를 포함하게 됩니다.

따라서 리소스 설정을 커스텀하고 YARN에 요청하고 싶다면 다음 config 설정에서 지정해야합니다. (YARN: spark.yarn.{driver/executor}.resource. , Spark: (spark.{driver/executor}.resource.)

 

Scheduling Across Applications

클러스터에서 실행되는 각 spark 어플리케이션들은 태스크를 실행하고 데이터를 저장하는 executor JVM 세트를 각각 가집니다. 만약 다수의 사용자가 동일한 클러스터를 공유한다면 클러스터 매니저에 따라 다른 자원 관리 옵션이 있습니다. 모든 클러스터 매니저에서 가능하고 가장 간단한 옵션은 자원의 static partitioning입니다. 이 방식에서는 각 애플리케이션에 사용할 수 있는 최대 리소스 양이 할당되며, 전체 기간 동안 이 리소스를 유지합니다. 이는 YARN, 그리고 Mesos 모드에서 사용되는 접근 방식입니다. YARN에서는 리소스 할당은 다음과 같이 구성할 수 있습니다. 

  • --num-executors: 클러스터에 몇개의 executor 할당할지 조정
  • --executor-memory, --executor-cores: 각 executor 리소스 지정

주의할 점은 어떤 클러스터 매니저도 현재는 모든 어플리케이션들끼리 메모리를 공유하는 것은 지원하지 않습니다. 만약 각 클러스터에 데이터를 공유하고 싶다면 단일 서버 어플리케이션을 실행시켜 동일한 RDD에 대한 요청을 받는 방식이 추천됩니다.

 

Dynamic Resource Allocation

spark 어플리케이션은 더이상 사용하지 않는 리소스를 클러스터에 반환하고 필요할 때 다시 요청합니다. 이런 특성은 여러 어플리케이션이 클러스터에서 자원을 공유하는 상황에서 유용합니다. 

 

Jobs in PySpark

기본 설정에서 PySpark는 PVM(파이썬 가상 머신) 스레드를 JVM(자바 가상 머신) 스레드와 동기화하는 것을 지원하지 않으며, 여러 작업을 여러 PVM 스레드에서 시작해도 각 작업이 해당하는 JVM 스레드에서 시작되는 것을 보장하지 않는다고 공식문서에서 안내하고 있습니다. 이 제한으로 인해 별도의 PVM 스레드에서 sc.setJobGroup을 통해 다른 작업 그룹을 설정할 수 없으며, 이로 인해 나중에 sc.cancelJobGroup을 통한 작업 취소도 불가능하다고 합니다. 

이러한 이슈를 해결하기 위해 `pyspark.InheritableThread`를 함께 사용하는 것이 권장되며 PVM 스레드가 JVM 스레드에서의 로컬 속성과 같은 상속 가능한 속성을 가져올 수 있다고 합니다.

 

 


글을 처음 시작할때는 spark-submit 이후 클러스터에서 실행되는 spark job과 자원 설정을 한번 보려고 했습니다. 공식 문서의 예시와 각 클러스터 매니저 중 yarn 케이스를 먼저 읽었습니다. 글은..거의 번역으로 끝난거 같은데... 컴포넌트 구조, yarn 아이디어에 대해서 한번 더 복습하고 각 컴포넌트 역할을 기본으로 아키텍쳐를 더 파봐야겠습니다. 

 

참고 링크

- https://spark.apache.org/docs/latest/running-on-yarn.html

 

Running Spark on YARN - Spark 3.5.0 Documentation

Running Spark on YARN Support for running on YARN (Hadoop NextGen) was added to Spark in version 0.6.0, and improved in subsequent releases. Security Security features like authentication are not enabled by default. When deploying a cluster that is open to

spark.apache.org

- https://www.samsungsds.com/kr/insights/spark-cluster-job-server.html

- https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html

 

 

'DataEngineering' 카테고리의 다른 글

Trino 아키텍처와 에러 파악하기  (3) 2024.11.10
Spark tuning  (0) 2024.03.17
[Airflow] Operators  (0) 2024.01.21
Airflow Concept  (0) 2023.12.24
MapReduce  (0) 2023.01.07