반응형
CHAPTER 8. 조인¶
- 다양한 데이터셋 조합으로 조인은 거의 모든 스파크 작업에 필수
8.1 조인표현식¶
- 키값을 비교하여 데이터셋과 오른쪽 데이터 셋의 결합여부를 결정하는 조인 표현식의 평가 결과에 따라 두개의 데이터 셋을 조인
- 가장 많이 사용하는 조인 : 동등 조인(왼.오른쪽 데이터셋에 지정된 키가 동일한지 비교)
- 일치하는 키가 없는 로우는 조인에 포함시키지 않음
- 복합 데이터 타입을 조인에 사용할 수 있음
8.2 조인 타입¶
- 데이터셋에 어떤 데이터가 있어야 하는지 결정
In [1]:
//스칼라 코드
val person = Seq(
(0, "Bill Chambers", 0, Seq(100)),
(1, "Matei Zaharia", 1, Seq(500, 250, 100)),
(2, "Michael Armbrust", 1, Seq(250, 100)))
.toDF("id", "name", "graduate_program", "spark_status")
val graduateProgram = Seq(
(0, "Masters", "School of Information", "UC Berkely"),
(1, "Masters", "EECS", "UC Berkely"),
(2, "PH.D", "EECS", "UC Berkely"))
.toDF("id", "degree", "department", "school")
val sparkStatus = Seq(
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor")
).toDF("id", "status")
Intitializing Scala interpreter ...
Spark Web UI available at http://win10-210321GO:4040 SparkContext available as 'sc' (version = 2.4.8, master = local[*], app id = local-1625656663028) SparkSession available as 'spark'
Out[1]:
person: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields] graduateProgram: org.apache.spark.sql.DataFrame = [id: int, degree: string ... 2 more fields] sparkStatus: org.apache.spark.sql.DataFrame = [id: int, status: string]
8.3 내부 조인¶
- 데이터 프레임이나 테이블에 존재하는 키를 평가
- 참(true)으로 평가되는 로우만 결합
In [2]:
//스칼라 코드
val joinExpression = person.col("graduate_program") === graduateProgram.col("id")
Out[2]:
joinExpression: org.apache.spark.sql.Column = (graduate_program = id)
- 데이터 프레임 모두에 키가 존재하지 않으면 결과 데이터프레임에서 볼수 없음
In [3]:
//스칼라 코드
val wrongJoinExpression = person.col("name") === graduateProgram.col("school")
Out[3]:
wrongJoinExpression: org.apache.spark.sql.Column = (name = school)
In [4]:
person.join(graduateProgram, joinExpression).show()
+---+----------------+----------------+---------------+---+-------+--------------------+----------+ | id| name|graduate_program| spark_status| id| degree| department| school| +---+----------------+----------------+---------------+---+-------+--------------------+----------+ | 0| Bill Chambers| 0| [100]| 0|Masters|School of Informa...|UC Berkely| | 2|Michael Armbrust| 1| [250, 100]| 1|Masters| EECS|UC Berkely| | 1| Matei Zaharia| 1|[500, 250, 100]| 1|Masters| EECS|UC Berkely| +---+----------------+----------------+---------------+---+-------+--------------------+----------+
- join 메서드의 세번째 파라미터(joinType) 으로 조인타입을 명확하게 지정가능
In [5]:
var joinType = "inner"
person.join(graduateProgram, joinExpression, joinType).show()
+---+----------------+----------------+---------------+---+-------+--------------------+----------+ | id| name|graduate_program| spark_status| id| degree| department| school| +---+----------------+----------------+---------------+---+-------+--------------------+----------+ | 0| Bill Chambers| 0| [100]| 0|Masters|School of Informa...|UC Berkely| | 2|Michael Armbrust| 1| [250, 100]| 1|Masters| EECS|UC Berkely| | 1| Matei Zaharia| 1|[500, 250, 100]| 1|Masters| EECS|UC Berkely| +---+----------------+----------------+---------------+---+-------+--------------------+----------+
Out[5]:
joinType: String = inner
8.4 외부 조인¶
- 데이터 프레임이나 테이블에 존재하는 키를 평가하여 참, 거짓으로 평가한 로우 포함(조인)
- 일치하는 로우가 없다면 해당위치 null 삽입
In [6]:
joinType = "outer"
person.join(graduateProgram, joinExpression, joinType).show()
+----+----------------+----------------+---------------+---+-------+--------------------+----------+ | id| name|graduate_program| spark_status| id| degree| department| school| +----+----------------+----------------+---------------+---+-------+--------------------+----------+ | 1| Matei Zaharia| 1|[500, 250, 100]| 1|Masters| EECS|UC Berkely| | 2|Michael Armbrust| 1| [250, 100]| 1|Masters| EECS|UC Berkely| |null| null| null| null| 2| PH.D| EECS|UC Berkely| | 0| Bill Chambers| 0| [100]| 0|Masters|School of Informa...|UC Berkely| +----+----------------+----------------+---------------+---+-------+--------------------+----------+
Out[6]:
joinType: String = outer
8.5 왼쪽 외부 조인¶
- 왼쪽 DF 의 모든 로우와 왼쪽 DF 와 일치하는 오른쪽 DF의 로우를 함께 포함
- 오른쪽 DF 에 일치하는 로우가 없다면 해당위치 null 삽입
In [7]:
joinType = "left_outer"
graduateProgram.join(person, joinExpression, joinType).show()
+---+-------+--------------------+----------+----+----------------+----------------+---------------+ | id| degree| department| school| id| name|graduate_program| spark_status| +---+-------+--------------------+----------+----+----------------+----------------+---------------+ | 0|Masters|School of Informa...|UC Berkely| 0| Bill Chambers| 0| [100]| | 1|Masters| EECS|UC Berkely| 2|Michael Armbrust| 1| [250, 100]| | 1|Masters| EECS|UC Berkely| 1| Matei Zaharia| 1|[500, 250, 100]| | 2| PH.D| EECS|UC Berkely|null| null| null| null| +---+-------+--------------------+----------+----+----------------+----------------+---------------+
Out[7]:
joinType: String = left_outer
오른쪽 외부 조인¶
- 오른쪽 DF 의 모든 로우와 오른쪽 DF 와 일치하는 왼쪽 DF 의 로우를 함께 포함
- 온쪽 DF 에 일치하는 로우가 없다면 해당 위치 null 삽입
In [8]:
joinType = "right_outer"
person.join(graduateProgram, joinExpression, joinType).show()
// -- SQL
// SELECT *
// FROM person
// RIGHT OUTER JOIN graduateProgram
// ON person.graduate_program = graduateProgram.id;
+----+----------------+----------------+---------------+---+-------+--------------------+----------+ | id| name|graduate_program| spark_status| id| degree| department| school| +----+----------------+----------------+---------------+---+-------+--------------------+----------+ | 0| Bill Chambers| 0| [100]| 0|Masters|School of Informa...|UC Berkely| | 2|Michael Armbrust| 1| [250, 100]| 1|Masters| EECS|UC Berkely| | 1| Matei Zaharia| 1|[500, 250, 100]| 1|Masters| EECS|UC Berkely| |null| null| null| null| 2| PH.D| EECS|UC Berkely| +----+----------------+----------------+---------------+---+-------+--------------------+----------+
Out[8]:
joinType: String = right_outer
8.7 왼쪽 세미 조인¶
- 오른쪽 DF 의 어떤 값도 포함하지 않기 때문에 다른 조인타입과 약간 다름
- 두번째 DF 는 값이 존재하는지 확인하기 위해 값만 비교하는 용도로 사용
- 값 존재시, 왼쪽 DF 에 중복키가 존재하더라도 해당 로우는 결과 포함
- DF 의 필터로 보면 됨
In [9]:
joinType = "left_semi"
graduateProgram.join(person, joinExpression, joinType).show()
//select *
//from gradProgram2
//left semi join person
//on gradProgram2.id = person.graduate_program
+---+-------+--------------------+----------+ | id| degree| department| school| +---+-------+--------------------+----------+ | 0|Masters|School of Informa...|UC Berkely| | 1|Masters| EECS|UC Berkely| +---+-------+--------------------+----------+
Out[9]:
joinType: String = left_semi
8.8 왼쪽 안티 조인¶
- 왼쪽 세미조인의 반대 개념
- 오른쪽 DF 의 어떠한 값도 포함하지 않음
- 두번째 DF 는 값이 존재하는지 확인위해 값을 비교한느 용도
- 두번재 DF 에 존재하는 값을 유지하는 대신 두번째 DF에서 관련된 키를 찾을 수 없는 로우만 포함
- SQL 의 NOT IN 같은 스타일의 필터
In [10]:
joinType = "left_anti"
graduateProgram.join(person, joinExpression, joinType).show()
+---+------+----------+----------+ | id|degree|department| school| +---+------+----------+----------+ | 2| PH.D| EECS|UC Berkely| +---+------+----------+----------+
Out[10]:
joinType: String = left_anti
8.9 자연 조인¶
- 조인하려는 컬럼을 암시적으로 추정
- 일치하는 컬럼을 찾고 그 결과를 반환
- 왼/오/외부 자연 조인 사용 가능
8.10 교차 조인(카테시안 조인)¶
- 조건절을 기술하지 않은 내부 조인
- 왼쪽 DF 의 모든 로우를 오른쪽 DF 의 모든 로우와 결합
- 엄청난 수의 로우 생성 가능
- 그러기에, 반드시 키워드를 이용해 교차 조인을 수행한다는 것을 선언해야함
In [11]:
joinType = "cross"
graduateProgram.join(person, joinExpression, joinType).show()
+---+-------+--------------------+----------+---+----------------+----------------+---------------+ | id| degree| department| school| id| name|graduate_program| spark_status| +---+-------+--------------------+----------+---+----------------+----------------+---------------+ | 0|Masters|School of Informa...|UC Berkely| 0| Bill Chambers| 0| [100]| | 1|Masters| EECS|UC Berkely| 2|Michael Armbrust| 1| [250, 100]| | 1|Masters| EECS|UC Berkely| 1| Matei Zaharia| 1|[500, 250, 100]| +---+-------+--------------------+----------+---+----------------+----------------+---------------+
Out[11]:
joinType: String = cross
In [12]:
person.crossJoin(graduateProgram).show()
+---+----------------+----------------+---------------+---+-------+--------------------+----------+ | id| name|graduate_program| spark_status| id| degree| department| school| +---+----------------+----------------+---------------+---+-------+--------------------+----------+ | 0| Bill Chambers| 0| [100]| 0|Masters|School of Informa...|UC Berkely| | 1| Matei Zaharia| 1|[500, 250, 100]| 0|Masters|School of Informa...|UC Berkely| | 2|Michael Armbrust| 1| [250, 100]| 0|Masters|School of Informa...|UC Berkely| | 0| Bill Chambers| 0| [100]| 1|Masters| EECS|UC Berkely| | 1| Matei Zaharia| 1|[500, 250, 100]| 1|Masters| EECS|UC Berkely| | 2|Michael Armbrust| 1| [250, 100]| 1|Masters| EECS|UC Berkely| | 0| Bill Chambers| 0| [100]| 2| PH.D| EECS|UC Berkely| | 1| Matei Zaharia| 1|[500, 250, 100]| 2| PH.D| EECS|UC Berkely| | 2|Michael Armbrust| 1| [250, 100]| 2| PH.D| EECS|UC Berkely| +---+----------------+----------------+---------------+---+-------+--------------------+----------+
8.12 스파크의 조인 수행 방식¶
- 노드간 네트워크 종신 전략
- 노드별 연산 전략 두가지가 잇음
8.12.1 네트워크 통신 전략¶
- 조인 시 두가지 클러스터 통신 방식을 사용
- 서플조인, 브로드캐스트 조인 : 노드간의 통신을 유발
큰 테이블과 큰 테이블 조인¶
셔플 조인
은 전체 노드간 통신이 발생- 특정 키, 특정 집합을 어떤 노드가 가졌는지에 따라 해당 노드와 데이터 공유
- 이러한 방식 때문에 복잡하고, 많은 자원 사용
- 모든 워크노드에서 통신이 발생
큰테이블과 작은 테이블 조인¶
- 단일 워커 노드의 메모리 크기에 적합할 정도로 충분히 작은 경우 조인 연산 최적화 가능
브로드 케스트 조인
이 훨씬 효율적- 시작시 단 한번만 족제 수행, 개별워커가 다른 워커 노드를 기다리거나 필요없이 작업 수행 가능
- 대규모 노드간 통신이 발생
- 노드 사이에 추가적인 통신이 발생하지는 않음
- 모든 단일 노드에서 개별적으로 조인이 수행되므로 CPU 가 가장 큰 병목구간이 됨
In [13]:
val joinExpr = person.col("graduate_program") === graduateProgram.col("id")
person.join(graduateProgram, joinExpr).explain()
== Physical Plan == *(1) BroadcastHashJoin [graduate_program#11], [id#26], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, false] as bigint))) : +- LocalTableScan [id#9, name#10, graduate_program#11, spark_status#12] +- LocalTableScan [id#26, degree#27, department#28, school#29]
Out[13]:
joinExpr: org.apache.spark.sql.Column = (graduate_program = id)
단점¶
- 너무 큰 데이터를 브로드캐스트하면 고비용의 수집연산이 발생하여 드라이버 노드가 비정산적으로 종료될 수 있음
8.13 정리¶
- 조인 전에 데이터를 적절하게 분할하면 셔플이 계획되어 있더라도 동일한 머신에 두 DF 의 데이터가 있을 수 있음
- 셔플을 피할 수 있고 훨씬 더 효율적으로 실행할 수 있음
- 필터 임무 수행으로 네트워크의 교환 데이터를 줄여 워크로드의 성능 향상 가능
In [ ]:
반응형
'Data Engineering > Spark' 카테고리의 다른 글
[ Spark ] 스파크 간단 스터디 6 (0) | 2021.07.18 |
---|---|
[ Spark ] 스파크 간단 스터디 5 (2) | 2021.07.11 |
[ Spark ] 스파크 간단 스터디 4 (0) | 2021.07.09 |
[Spark] 스파크 간단 스터디 2 (0) | 2021.06.27 |
[Spark] 스파크에 대한 간단 스터디 1 (0) | 2021.06.19 |