ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spark RDD, DataFrame, Dataset
    DataEngineering/Spark 2021. 7. 26. 05:49

    Spark 에서 제공하는 데이터 API들

    RDD(2011) DataFrame(2013) Dataset(2015)
    -가장 기본적인 저수준 API
    -JVM object들의 distribute collection
    -table join, 최적화 등을 사용자가 직접 정의해야함
    -고수준 API
    -Row object들의 distribute collection
    -사용자 정의 함수 사용 가능
    -schema less
    -고수준 API
    -내부적으로는 rows, 외부적으로는 JVM object들의 ,,
    -사용자가 데이터를 객체화해서 사용할 수 있도록 사용자에게 도메인 객체를 제공
    -자바, 스칼라만 가능
    -자동으로 schema 가짐

    RDD(Resilient Distributed Dataset)

    [특징]

    • In-memory Computation ; 중간 데이터 결과를 디스크가 아닌 메모리가 저장해 성능을 향상시킨다.
      • MapReduce의 경우는 모든 노드 간에 데이터를 전송할 때 항상 디스크에 저장되어야 하고 항상 MR 작업의 반복을 통해서만 처리가 가능하기 때문에 Disk IO가 가장 큰 병목 중 하나이다. Spark는 영속성을 보장하는 RDD를 통해 메모리가 충분할 경우 메모리에서 모든 데이터를 처리하고 저장하는 것이 가능하기 때문에 기존 MR의 한계를 극복하고 성능을 크게 향상시킨것!
      • 메모리에 저장할지 디스크에 저장할지 엔진단에서 실행을 해준다.
                       
      • Lazy-Evaluation ; 모든 transformation 처리는 지연해서 수행됨, action이 수행되기 직전까지 데이터 처리 과정을 분석해 최적의 DAG를 구성할 수 있도록 함
        • API를 통해 생성된 데이터 파이프라인 작업이 Action API가 호출되기 전까지는 실제로 아무런 동작을 하지 않는 것이다. count, write등 Action API 호출 시점에 여태까지의 변환 작업을 최적화하여 수행할 수 있도록 DAG를 생성한다.
        • 이게 좋은건가??? 🤔
        yes ;)
      • 첫번째로, 실제 수행되어야 하는 작업 단계 사이의 수행 & 대기 시간이 줄어들어 시간 복잡도가 줄어들고
      • 두번째로, 최적화를 통해 필요 없는 작업을 줄여서 공간 복잡도가 줄어들고
      • 세번째로, 데이터 처리가 필요한 경우에만 작업이 수행되기 때문에 리소스가 효율적으로 활용된다. 예를 들어 스트리밍 처리 시에는 데이터가 존재하는 경우에만 트리거링 되기 때문에 리소스면에서 효율적!
    • Fault tolerance ; 최초에 계획했던 / 정상적으로 저장된 이후부터의 모든 변환 이력들을 가지고 있음 → 실패 시 해당 시점부터 다시 처리 가능
      • 이런 장애 발생에도 작업 수행에 문제가 없는 특성은 HDFS의 fault-tolerant file system 특성을 갖고 있기 때문이다.
      • 때문에 streaming 처리의 경우에도 해당 애플리케이션이 사용하는 데이터 소스의 특성이 fault-tolerant 하지 않을 경우 장애를 회피할 수는 없다. (replica가 1인 경우 등)
      • 자신의 Deterministic Lineage 정보를 알고 있다.
        • Lineage ? 계보, 어떤 데이터 소스에서 어떤 변환을 거쳐서 어떤 결과물이 생성되었는지 이런 정보들을 갖고 있다는 것
                 
      • 이러한 변환 과정에서 take(3) action 작업 수행 도중 컨테이너에 문제가 발생한다면 복구 불가능할 경우 CSV read 부터 다시 시작하는거야! 정상적인 노드에서 동일한 작업을 다시 수행할 수 있는 것이다.
    • Immuntable ; 처리되는 모든 데이터는 불변성을 가짐 → 분산 환경에서 공유하고 사용할 때 동시성 문제가 없음
      • 불변성이라는게 뭐가 불변한다는 걸까. RDD는 분산 저장된 데이터를 가리키는 가상의 데이터 집합 개념이며 로컬 메모리에 존재하여 변경할 수 없는 객체다. 모든 RDD의 변환을 위해서는 기존 RDD를 바꾸는 변환이 아니라 새로운 RDD를 생성해야만 한다.
    • Partitioning ; 병렬성을 결정짓는 기본적인 단위, 각 파티션은 변경 가능한 논리적인 구분 분할 단위이다.
      • Word count 데이터가 물리적으로 다른 장비의 파티션 수준에서 관리된다.
      • 이 파티션 형태가 다음 작업 수행 성능에 영향을 준다.
    • Persistence ; 사용자는 RDD의 영속성을 결정할 수 있는 저장소 전략을 메모리 혹은 디스크로 직접 지정할 수 있다.
      • ex
        • MEMORY_ONLY ; 얘가 디폴트, 자바 객체를 그대로 유지한다.
        • MEMORY_AND_DISK ; 메모리 부족하면 디스크 써라.
        • MEMORY_ONLY_SER ; 직렬화된 메모리 객체만 써라.
    • Corse-grained Operations ; filter, group by, map 등의 연산은 해당 데이터 집합의 모든 element에 적용된다.
    • Location-awareness ; DAG scheduler는 개별 작업이 필요로 하는 데이터가 가능한 가장 가까운 노드에 배치될 수 있도록 정의한다.

    [operation]

    • Transformation
      • 기존에 존재하는 데이터로부터 새로운 데이터 집합을 생성 혹은 변환한다.
      • map, filter, union, distinct, join, group by 등
      • 변환 특징에 따라 셔플이 발생하지 않는 map side 변환인 Narrow Transformation 이 있고, 셔플이 발생하는 map+reduce 변환인 Wide Transformation 으로 구분할 수 있다.
    • Action
      • 데이터셋의 값을 반환하거나 저장하는 역할이다.
      • collect, count, first, take, saveAsTextFile 등

    DataFrame & Dataset

    RDD 는 저수준의 데이터 변환 또는 domain specific한 라이브러리를 사용해야 하거나 데이터를 읽어들이기 위한 구현이 별도로 필요한 경우에 한해서만 사용된다.

    ✅ DataFrame이 일반적으로 가장 많이 활용되는 데이터 유형이고, 범용성, 코드 유지 보수 측면에서도 가장 추천된다. SparkSQL 도 DataFrame 기반이니까 !

    • python pandas, R 에서 제공하는 DataFrame처럼 table형태로 구조화된 데이터이다. 이제 차이는 Spark DataFrame은 분산 처리를 기반으로 한 설계로 빠르고, 고성능을 위해 효율적으로 카탈리스트 엔진을 운영한다는 것.
    • 사용자는 쉽게 DataFrame을 통해서 데이터를 작업하면 저수준 RDD로 돌아갈 때 거치는 일련의 과정들을 통해 Catalyst Optimizer 라는 최적화 기법이 들어간다.

    ✅ 그럼 Dataset은 ? 컴파일 시에 타입을 명시적으로 정의해야 할 필요가 있거나, 여러 software 엔지니어들이 큰 어플리케이션을 구현할 때 명시적인 인터페이스를 통해서 개발할 경우 유용하다. (그니까 type-safe 한 데이터 집합이 필요할 때 !) 정적 데이터 타입을 지원해야 하므로 스칼라랑 자바만 지원함

    ✅ SparkContext → SparkSession

    • 기존 RDD에서는 SparkContext를 생성하지만 DataFrame 및 Spark SQL API는 SparkSession 객체를 생성한다! (스파크 2부터는 RDD도 생성이 가능하긴 하다.)
    • 둘의 차이는 스파크 초기 버전에는 다른 api들에 대한 시작점으로 다양한 Context가 있었다. (Core API를 위한 SparkContext, SparkSQL API를 위한 SQLContext, Dstream API를 위한 Streaming Context 등등) → 스파크 2.0부터 하나의 진입점으로 SparkSession이 있고, 여기서 앞에서 말한 다양한 진입점 Context들로 갈 수 있다.

    Catalyst Optimizer

    앞에서 잠깐 언급한 catalyst optimizer를 좀 더 설명을 해야함,

    • Spark SQL Catalyst Optimizer

    : 사용자의 쿼리를 물리적인 실행계획으로 생성해내는 엔진이다. 규칙 기반으로 논리 계획을, 비용 모델을 기반으로 물리 계획을 최적화함으로써 최적의 질의 수행을 할 수 있도록 계획을 해주는 아이라고 생각하면 된다.

    'DataEngineering > Spark' 카테고리의 다른 글

    Spark Intro  (0) 2021.07.26

    댓글

Designed by Tistory.