반응형
In [1]:
//스칼라 코드
val myCollection = "Spark The Definitive Guide : BigData Processing Made Simple".split(" ")
val words = spark.sparkContext.parallelize(myCollection,2)
Intitializing Scala interpreter ...
Spark Web UI available at http://win10-210321GO:4040 SparkContext available as 'sc' (version = 2.4.8, master = local[*], app id = local-1626600754783) SparkSession available as 'spark'
Out[1]:
myCollection: Array[String] = Array(Spark, The, Definitive, Guide, :, BigData, Processing, Made, Simple) words: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26
13.1 키-값 형태의 기초¶
- RDD 에는 데이터를 키-값 형태로 다룰 수 있는 다양한 메서드 존재
<연산명>.ByKey
형태의 이름을 가짐PairRDD
타입만 사용 가능(맵-연산 구조를 사용해 키-값 구조로 만드는 것임)
In [4]:
words.map(word => (word.toLowerCase,1))
Out[4]:
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[1] at map at <console>:28
13.1.1 keyBy¶
- keyBy : 현재값으로 키를 생성하는 함수
In [5]:
val keyword = words.keyBy(word => word.toLowerCase.toSeq(0).toString)
Out[5]:
keyword: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at keyBy at <console>:26
In [7]:
keyword
Out[7]:
res4: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at keyBy at <console>:26
13.1.2 값 매핑하기¶
- 튜플 형태의 데이터 사용시 스파크는 첫번째 요소를 키, 두번재 요소를 값으로 추정
- 튜플 형태의 데이터에서 키를 제외하고 값만 추출 가능
mapValues
메서드 사용시 값 수정시 발생할 수 있는 오류를 미리 방지 가능flatMap
함수를 사용해 반환되는 결과의 각 로우가 문자를 나타내도록 확장 가능
In [8]:
keyword.mapValues(word => word.toUpperCase).collect()
Out[8]:
res5: Array[(String, String)] = Array((s,SPARK), (t,THE), (d,DEFINITIVE), (g,GUIDE), (:,:), (b,BIGDATA), (p,PROCESSING), (m,MADE), (s,SIMPLE))
In [11]:
keyword.flatMapValues(word => word.toUpperCase).collect()
Out[11]:
res8: Array[(String, Char)] = Array((s,S), (s,P), (s,A), (s,R), (s,K), (t,T), (t,H), (t,E), (d,D), (d,E), (d,F), (d,I), (d,N), (d,I), (d,T), (d,I), (d,V), (d,E), (g,G), (g,U), (g,I), (g,D), (g,E), (:,:), (b,B), (b,I), (b,G), (b,D), (b,A), (b,T), (b,A), (p,P), (p,R), (p,O), (p,C), (p,E), (p,S), (p,S), (p,I), (p,N), (p,G), (m,M), (m,A), (m,D), (m,E), (s,S), (s,I), (s,M), (s,P), (s,L), (s,E))
13.1.3 키와 값 추출하기¶
- 키-값 형태의 데이터를 가지고 있다면 아래 메서드 통해 키, 값 전체 추출 가능
In [13]:
keyword.keys.collect()
Out[13]:
res10: Array[String] = Array(s, t, d, g, :, b, p, m, s)
In [14]:
keyword.values.collect()
Out[14]:
res11: Array[String] = Array(Spark, The, Definitive, Guide, :, BigData, Processing, Made, Simple)
13.2 집계¶
- 사용하는 메서드에 따라 RDD 나 PairRDD 를 사용해 집계를 수행 가능
countByKey
: 각 키의 아이템 수를 구하고 로컬 맵으로 결과 수집- 제한과 신뢰도를 인수로 지정해 근사치 구할수 있음
In [15]:
val chars = words.flatMap(word => word.toLowerCase.toSeq)
val KVcharacters = chars.map(letter =>(letter, 1))
def maxFunc(left:Int, right:Int) = math.max(left, right)
def addFunc(left:Int, right:Int) = left + right
val nums = sc.parallelize(1 to 30, 5)
Out[15]:
chars: org.apache.spark.rdd.RDD[Char] = MapPartitionsRDD[7] at flatMap at <console>:27 KVcharacters: org.apache.spark.rdd.RDD[(Char, Int)] = MapPartitionsRDD[8] at map at <console>:28 maxFunc: (left: Int, right: Int)Int addFunc: (left: Int, right: Int)Int nums: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:33
In [16]:
// 스칼라 코드
val timeout = 1000L // 밀리세컨드 단위
val confidence = 0.95
KVcharacters.countByKey()
KVcharacters.countByKeyApprox(timeout, confidence)
Out[16]:
timeout: Long = 1000 confidence: Double = 0.95 res12: org.apache.spark.partial.PartialResult[scala.collection.Map[Char,org.apache.spark.partial.BoundedDouble]] = (final: Map(e -> [7.000, 7.000], s -> [4.000, 4.000], n -> [2.000, 2.000], t -> [3.000, 3.000], u -> [1.000, 1.000], f -> [1.000, 1.000], a -> [4.000, 4.000], m -> [2.000, 2.000], i -> [7.000, 7.000], v -> [1.000, 1.000], b -> [1.000, 1.000], g -> [3.000, 3.000], l -> [1.000, 1.000], p -> [3.000, 3.000], c -> [1.000, 1.000], h -> [1.000, 1.000], r -> [2.000, 2.000], : -> [1.000, 1.000], k -> [1.000, 1.000], o -> [1.000, 1.000], d -> [4.000, 4.000]))
13.3 cogroup¶
- 스칼라 사용시 최대 3개, 파이썬 사용시 최대 2개의 키-값 형태의 RDD 그룹화 가능
- 각 키를 기준으로 값을 결합
- RDD 에 대한 그룹 기반의 조인을 수행
cogroup
함수는 출력 파티션 수나 클러스터에 데이터 분산방식을 정확하게 제어하기 위해 사용자 정의 파티션 함수를 파라미터로 사용
In [17]:
import scala.util.Random
val distinctChars = words.flatMap(word => word.toLowerCase.toSeq).distinct
val charRDD = distinctChars.map(c => (c, new Random().nextDouble()))
val charRDD2 = distinctChars.map(c => (c, new Random().nextDouble()))
val charRDD3 = distinctChars.map(c => (c, new Random().nextDouble()))
charRDD.cogroup(charRDD2, charRDD3).take(5)
Out[17]:
import scala.util.Random distinctChars: org.apache.spark.rdd.RDD[Char] = MapPartitionsRDD[16] at distinct at <console>:28 charRDD: org.apache.spark.rdd.RDD[(Char, Double)] = MapPartitionsRDD[17] at map at <console>:29 charRDD2: org.apache.spark.rdd.RDD[(Char, Double)] = MapPartitionsRDD[18] at map at <console>:30 charRDD3: org.apache.spark.rdd.RDD[(Char, Double)] = MapPartitionsRDD[19] at map at <console>:31 res13: Array[(Char, (Iterable[Double], Iterable[Double], Iterable[Double]))] = Array((d,(CompactBuffer(0.9162075857632873),CompactBuffer(0.9695227646190717),CompactBuffer(0.8316103431242687))), (p,(CompactBuffer(0.9453521376467396),CompactBuffer(0.07159719723214009),CompactBuffer(0.13060814613492133))), (t,(CompactBuffer(0.20336973124360547),CompactBuffer(0.18165984149815817),C...
13.4 조인¶
- RDD 는 구조적 API 에서 알아본 것과 거의 동일한 조인 방식을 가지고 있지만, RDD 를 사용하면 사용자가 많은 부분에 관여해야함
- RDD 나 구조적 API 의 조인 방식 모두 동일한 기본 형식을 사용
13.4.2 zip¶
- 두개의 RDD 를 결합하므로 조인이라고 볼 수 잇음
- ZIP 을 사용하면 동일한 길이의 두개의 RDD 를 지퍼를 잠그듯 연결할수 잇으며 PairRDD를 생성
- 두개의 rdd는 동일한 수의 요소와 동일한 수의 파티션을 가져야 한다
13.5 파티션 제어하기¶
- RDD 를 사용하면 데이터가 클러스터 전체에 물리적으로 정확히 분산되는 방식을 정의할 수 있음
- 구조적 API 와 차이점은 VKXLTUS GKATNFMF VKFKALXJFH TKDYDGKF TN DLtDMA
coalesce
: 파티션 재분배할 때 발생하는 데이터 셔플을 방지하기 위해 동일한 워커에 존재하는 파티션을 합치는 메서드, 데이터 셔플링 없이 하나의 파티션으로 합칠 수 있음repartition
: 파티션수를 늘리거나 줄일 수 있지만, 처리시 노드간 셔플 발생 가능, 파티션 수를 늘리면 맵/필터 타입 연산 수행 시 병렬 처리 수준 높일 수 있음
In [18]:
//coalesce
words.coalesce(1).getNumPartitions
Out[18]:
res14: Int = 1
In [19]:
// repartition
words.repartition(10)
Out[19]:
res15: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at repartition at <console>:30
사용자 정의파티셔닝¶
- 사용자 정의 파티셔닝은 RDD 를 사용하는 이유중 하나
- 사용자 정의 파티셔너는 저수준 API 의 세부적인 구현 방식이며 잡이 성공적으로 동작되는지 여부에 상당한 영향을 끼침
- 목표는 데이터의 치우침과 같은 문제를 피하고자 클러스터 전체에 걸쳐 데이터를 균등하게 분배하는 것
- 구조적 API 로 RDD를 얻고 사용자 정의 파티셔너를 적용한 다음 다시 DF 나 데이터 셋으로 변환해야 한다
- 파티셔너를 확장한 클래스를 구현해야 사용가능
HashPartitioner
, RangePartitioner
- RDD API 에서 사용 가능한 내장형 파티셔너
- 이산형과 연속형 값을 다룰 때 사용
- 구조적 API 와 RDD 모두 사용 가능
키 치우침
: 어떤 키가 다른 키에 비해 아주 많은 데이터를 가지는 현상- 병렬ㄹ성을 개선하고 실행과정에서
OutOfMemoryError
를 방지할 수 있도록 키를 최대한 분할해야 한다
In [ ]:
반응형
'Data Engineering > Spark' 카테고리의 다른 글
[Spark] 스파크 집계연산 정리 1 (0) | 2023.11.22 |
---|---|
[Spark] EMR Spark 재시작 하기 (0) | 2022.03.03 |
[ Spark ] 스파크 간단 스터디 6 (0) | 2021.07.18 |
[ Spark ] 스파크 간단 스터디 5 (2) | 2021.07.11 |
[ Spark ] 스파크 간단 스터디 4 (0) | 2021.07.09 |