반응형
Chapter 12. RDD¶
- 저수준 API 이며, 분산형 공유변수이다
12.1 저수준 API 이란¶
- 스파크에는 두 종류의 저수준 API 존재
- 분산 데이터 처리를 위한 RDD
- 분산형 공유 변수를 배포하고 다루기 위한 브로드 캐스트 변수, 어큐뮬레이터
12.1.1 저수준 API 는 언제 사용할까¶
- 고수준 API 에서 제공하지 않는 기능이 필요한 경우
- RDD를 사용해 개발된 기존 코드를 유지해야 하는 경우
- 사용자가 정의한 공유 변수를 다뤄야 하는 겨우
--> 위와 같은 상황에서 저수준 API 사용해야함
- 스파크의 모든 워크로드는 저수준 기능을 사용하는 기초적인 형태로 컴파일되므로 이를 이해하는 것은 큰 도움이 됨
- DF 트랜스포메이션 호출시, 다수의 RDD 트랜스포메이션으로 변환
- 저수준 API 는 세밀한 제어 방법을 제공하여 개발자가 치명적인 실수를 하지 않도록 도와줌
12.1.2 저수준 API 는 어떻게 사용할까¶
SparkContext
는 저수준 api 기능을 사용하기 위한 진입 지점SparkSession
을 이용해SparkContext
에 접근할 수 있음
spark.sparkContext
12.2 RDD 개요¶
- 스파크1.X 버전의 핵심이었지만 2.X 버전에서는 잘 사용하지 않음
- 사용자가 실행한 모든 DF , DS 코드는 RDD 로 컴파일됨.
RDD 는 불변성을 가지며 병렬로 처리할 수 있는 파티셔닝된 레코드의 모음
- RDD 의 모든 레코드는 자바, 파이썬의 객체이므로 완벽하게 제어 가능함
- 사용자가 원하는 포맷을 사용해 원하는 모든 데이터 저장 가능
- 하지만, 모든 값을 다루거나, 값 사이의 상호작용 과정을 반드시 수동으로 정의해야하는 단점
- 내부구조 파악이 힘들어 최적화를 위해서는 많은 수작업 필요
12.2.1 RDD 유형¶
- RDD 에는 수많은 하위 클래스가 존재
- DF API 에서 최적화된 물리적 실행계획을 만드는 데 대부분 사용
제네릭 RDD
과키-값 RDD
두가지의 RDD 를 만들 수 있음
RDD 의 주요 다섯가지 속성¶
- 파티션의 목록
- 각 조각을 연산하는 함수
- 다른 RDD 와의 의존성 목록
- 부가적으로 키-값 RDD 를 위한 Partitioner
- 부가적으로 각 저각을 연산하기 위한 기본 위치 목록
- 이러한 속성은 스케쥴링하고 실행하는 스파크의 모든 처리방식 결정
- 분산환경에서 데이터를 다루는데 필요한 지연처리 방식의
트렌스포메이션
과 즉시 실행 방식의액션
을 제공 - 로우라는 개념은 없음
일반적으로 분산 게이터에 세부적인 제어가 필요할 때 RDD 사용
12.3 RDD 생성하기¶
13.3.1 데이터프레임, 데이터셋을 RDD 로 생성¶
In [1]:
//스칼라 코드
spark.range(500).rdd
Intitializing Scala interpreter ...
Spark Web UI available at http://win10-210321GO:4040 SparkContext available as 'sc' (version = 2.4.8, master = local[*], app id = local-1626093427850) SparkSession available as 'spark'
Out[1]:
res0: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[5] at rdd at <console>:27
In [2]:
// 스칼라 코드
spark.range(10).toDF().rdd.map(rowObject => rowObject.getLong(0))
Out[2]:
res1: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[12] at map at <console>:27
12.3.2 로컬 컬렉션으로 RDD 생성¶
- 컬랙션 객체를 RDD 로 만들려면
sparkContext
의Parallelize
메서드를 호출해야함 - 단일 노드에 있는 컬렉션을 병렬 컬렉션으로 전환
In [4]:
//스칼라 코드
val myCollection = "Spark The Definitive Guide : BigData Processing Made Simple".split(" ")
val words = spark.sparkContext.parallelize(myCollection,2)
Out[4]:
myCollection: Array[String] = Array(Spark, The, Definitive, Guide, :, BigData, Processing, Made, Simple) words: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:28
In [6]:
//스칼라 코드
// RDD 에 이름을 지정하면 스파크 UI 에 지정한 이르므로 RDD 가 언다
words.setName("myWords")
words.name
Out[6]:
res3: String = myWords
12.3.3 데이터 소스로 RDD 생성하기¶
- DataSource API 를 사용해 RDD 를 생성하는 것이 바람직하다.
- RDD 에는 데이터프레임이 제공하는 DataSource API 라는 개념이 없음
- RDD 는 주로 RDD 간의 의존성 구조와 파티션 목록을 정의
- DataSource API 는 데이터를 읽는 가장 좋은 방법
sparkContext
를 사용해 RDD 를 읽을 수 있음
12.4 RDD 다루기¶
- 데이터 프레임을 다루는 방식과 매우 유사
- 스파크 데이터 타입 대신 자바, 스칼라의 객체를 다룬다는 사실이 가장 큰 차이
- 헬퍼 메서드나 함수도 데이터프레임에 비해 부족하기에 데이터프레임의 다양한 함수를 사용자가 직접 정의해야 함
12.5 트렌스포메이션¶
- 구조적 API 에서 사용할 수 있는 기능을 가지고 있음
- RDD 에 트랜스포메이션을 지정해 새로운 RDD 생성 가능
In [9]:
// DISTINCT - 중복된 데이터 제러
words.distinct().count()
Out[9]:
res6: Long = 9
In [10]:
// filter - sql 의 where 조건절과 비슷, 조건함수를 만족하는 레코드만 반환
def startsWithS(individual:String) = {
individual.startsWith("S")
}
Out[10]:
startsWithS: (individual: String)Boolean
In [13]:
// map - 주어진 입력을 원하는 값으로 반환하는 함수를 명시하고 레코드별로 적용
val words2 = words.map(word => (word, word(0), word.startsWith("S")))
Out[13]:
words2: org.apache.spark.rdd.RDD[(String, Char, Boolean)] = MapPartitionsRDD[18] at map at <console>:27
In [14]:
// flatMap - map 함수의 확장 버전, 확장 가능한 map 함수의 출력을 반복 처리할수 있는 형태로 반환
words.flatMap(word => word.toSeq).take(5)
Out[14]:
res7: Array[Char] = Array(S, p, a, r, k)
In [16]:
// sortBy - RDD 정렬의해 사용, 데이터 객체에서 값을 추출한 다음 값을 기준으로 정렬
words.sortBy(word => word.length() * -1).take(2)
Out[16]:
res9: Array[String] = Array(Definitive, Processing)
In [17]:
// randomSplit - RDD 를 임의로 분할해 RDD 배열을 만들때 사용
val fiftyFiftySplit = words.randomSplit(Array[Double](0.5, 0.5))
Out[17]:
fiftyFiftySplit: Array[org.apache.spark.rdd.RDD[String]] = Array(MapPartitionsRDD[25] at randomSplit at <console>:27, MapPartitionsRDD[26] at randomSplit at <console>:27)
12.6 액션¶
- RDD의 모든 값을 하나의 값으로 만들려면
recude
매서드를 사용 - 리듀스 연산은 비 결정적 특성을 가짐
count
: RDD 의 전체 로우 수를 알 수 있음.countApprox
: count 함수의 근사치를 제한 시간 내 계산, 제한시간 초과시 불완전한 결과 반환할 수 있음countByValue
: RDD 값의 개수를 구한다, 결과 데이터 셋을 메모리로 읽어들여 처리한다.countByValueApprox
: count와 동일 연산 수행, 근사치 계산first
: 데이터셋의 첫번째 값을 반환max
,min
: 최댓값과 최솟값을 반환take
: RDD 에서 가져올 값의 개수를 파라미터로 사용, 메서드는 먼저 하나의 파티션을 스캔, 다음, 파티션의 결과 수를 이용해 파라미터로 지정된 값을 만족하는데 필요한 추가 파티션 수 예측
12.7 파일 저장하기¶
- RDD 를 사용하면 일반적의미의 데이터 소스에 저장할 수 없음
- 각 파티션 내용을 저장하려면 전체 파티션을 순회하면서 외부 데이터베이스에 저장해야 함
- 바로 위 방식은 고수준 API 내부 처리 과저이을 저수준 API 로 구현하는 접근법
- 스파크는 하둡 에코시스템을 기반으로 성장하여, 다양한 하둡기능과 잘 호환
- 시퀀스 파일은 바이너리 키-값 쌍으로 구성된 플랫 파일, 맵리듀스의 입출력 포멧으로 널리 사용
- 하둡 파일 포맷을 사용하면 클래스, 출력 포맷, 하둡 설정 그리고 압출방식을 지정할 수 있음
12.8 체크포인팅¶
- 데이터 프레임 api 에서 사용할 수 없는 기능 중 하나는 체크포인팅
- 체크포인팅은 RDD 르 디스크에 저장하는 방식
- 저장된 RDD 를 참조할 때 원본 데이터소스를 다시 계산해 RDD 를 생성하지 않고 디스크에 저장된 중간 결과 파티션을 참조
반응형
'Data Engineering > Spark' 카테고리의 다른 글
[Spark] EMR Spark 재시작 하기 (0) | 2022.03.03 |
---|---|
[ Spark ] 스파크 간단 스터디 7 (0) | 2021.07.18 |
[ Spark ] 스파크 간단 스터디 5 (2) | 2021.07.11 |
[ Spark ] 스파크 간단 스터디 4 (0) | 2021.07.09 |
[Spark] 스파크 간단 스터디 3 (0) | 2021.07.07 |