반응형
Chapter 9. 데이터 소스¶
- 스파크에는 6가지 핵심 데이터 소스와 커뮤니티에서 만든 수백가지 외부 데이터 소스 존재
- 핵심 데이터 소스 :
CSV
,JSON
,PARQUET
,ORC
,JDBC/ODBC
,일반 TXT
- 외부 데이터 소스 :
카산드라
,HBASE
,MONGODB
,AWS REDSHIFT
,XML
,기타 소스
9.1 데이터 소스 API 구조¶
DataFrameReader.format(...).option("key", "value").chema(...).load()
- 모든 데이터 소스를 읽을 때 위와 같은 형식 사용
- format 메서드 : 선택적으로 사용할수 있으며 parquet 가 기본 포멧
- option 메서드 : 키-값 쌍으로 파라미터 설정 가능
- schema 메서드 : 데이터 소스에서 스키마 제공, 스키마 추론 기능 사용시 선택적 사용 가능
9.1.2 데이터 읽기의 기초¶
- 데이터를 읽을 때 기본적으로 DataFrameReader 를 사용
- SparkSession 의 read 속성으로 접근
spark.read
- 다음의 값을 지정해야함 : 포맷, 스키마, 읽기모드, 옵션
- 읽기 모드를 제외한 3가지 항목은 필요한 경우에만 선택적 지정 가능
- 사용자는 DataFrameReader 에 반드시 데이터를 읽을 경로 지정해야한다
spark.read.foramt('csv')
.option("mode", "FAILFAST")
.option("inferSchema", "true")
.option("path", "path/to.file(s)")
.shcema(someSchema)
.load()
- 읽기 모드는 스파크가 형식에 맞지 ㅇ낳는 데이터를 만났을 때 동작 방식을 지정하는 옵션
9.1.3 쓰기 API 구조¶
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
- partitionBy, bucketBy, sortBy 메서드는 파일 기반의 데이터소스에서만 동작
- format, option, save 모드를 지정해야 하며, 데이터가 저장될 경로를 반드시 입력해야함
dataframe.write("csv")
.option("mode", "OVERWRITE")
.option("dataFormat", "yyyy-MM-dd")
.ioption("path", "path/to/file(s)")
.save()
In [2]:
// 스칼라 코드
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
val myManualSchema = new StructType(Array(
new StructField("DEST_COUNTRY_NAME", StringType, true),
new StructField("ORIGIN_COUNTRY_NAME", StringType, true),
new StructField("count", LongType, false)
))
spark.read.format("csv")
.option("header", "true")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("C:/Users/COM/Desktop/home/study_group/1st_spark_studygroup_2106_/Spark-The-Definitive-Guide-master/data/flight-data/csv/2010-summary.csv")
.show(5)
+-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ | United States| Romania| 1| | United States| Ireland| 264| | United States| India| 69| | Egypt| United States| 24| |Equatorial Guinea| United States| 1| +-----------------+-------------------+-----+ only showing top 5 rows
Out[2]:
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType} myManualSchema: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,false))
CSV 파일 쓰기¶
In [3]:
//스칼라 코드
val csvFile = spark.read.format("csv")
.option("header", "true").option("mode", "FAILFAST").schema(myManualSchema)
.load("C:/Users/COM/Desktop/home/study_group/1st_spark_studygroup_2106_/Spark-The-Definitive-Guide-master/data/flight-data/csv/2010-summary.csv")
Out[3]:
csvFile: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
TSV 파일로 내보내기¶
In [7]:
// 스칼라 코드
csvFile.write.format("csv").mode("overwrite").option("set", "\t")
.save("C:/Users/COM/Desktop/home/study_group/1st_spark_studygroup_2106_/Spark-The-Definitive-Guide-master/data/flight-data/csv/haha.tsv")
9.3 JSON 파일¶
- 스파크에서는 줄로 구분된 JSON 을 기본적으로 사용함
- multiLine 옵션을 사용해 줄로 구분된 방식과 여러 줄로 구성된 방식을 선택적으로 사용할 수 있음
- JSON 을 파싱한 다음에 DataFrame 을 생성
- 다른 포멧에 비해 훨씬 더 안정적인 포맷이므로 추천하는 방식
- 구조화되어 있고 최소한의 기본 데이터 타입이 존재하기 때문에 인기가 많음
JSON 파일 읽기¶
In [11]:
// 스칼라 코드
spark.read.format("json").option("mode","FAILFAST").schema(myManualSchema)
.load("C:/Users/COM/Desktop/home/study_group/1st_spark_studygroup_2106_/Spark-The-Definitive-Guide-master/data/flight-data/json/2010-summary.json").show(5)
+-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ | United States| Romania| 1| | United States| Ireland| 264| | United States| India| 69| | Egypt| United States| 24| |Equatorial Guinea| United States| 1| +-----------------+-------------------+-----+ only showing top 5 rows
In [10]:
csvFile.write.format("json").mode("overwrite")
.save("C:/Users/COM/Desktop/home/study_group/1st_spark_studygroup_2106_/Spark-The-Definitive-Guide-master/data/flight-data/json/2010-summary.json")
9.4 파케이 파일¶
- 스토리지 최적화 기술을 제공하는 오픈소스로 만들어진 컬럼 기반의 데이터 저장 방식
- 저장소 공간 절약 가능
- 전체 파일 대신 개별 컬럼 읽고, 컬럼기반의 압축 기능 제공
- 스파크와 잘 호환되어 스파크의 기본 파일 포맷
- 파케이는 읽기 연산 시 JSON, CSV 보다 효율적으로 동작하여 장기 저장용 데이터는 파케이가 좋음
- 복합 데이터 타입을 지원 (컬럼이 배열, 맵, 구조체 데이터 타입이라도 문제없이 읽고 쓸 수 있음)
- 옵션이 거의 없음(데이터 저장시 자체 스키마 사용해 데이터 저장)
In [12]:
spark.read.format("parquet")
// 스칼라 코드
spark.read.format("parquet")
.load("C:/Users/COM/Desktop/home/study_group/1st_spark_studygroup_2106_/Spark-The-Definitive-Guide-master/data/flight-data/parquet/2010-summary.parquet").show(5)
+-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ | United States| Romania| 1| | United States| Ireland| 264| | United States| India| 69| | Egypt| United States| 24| |Equatorial Guinea| United States| 1| +-----------------+-------------------+-----+ only showing top 5 rows
파케이 파일 쓰기¶
In [13]:
csvFile.write.format("parquet").mode("overwrite")
.save("C:/Users/COM/Desktop/home/study_group/1st_spark_studygroup_2106_/Spark-The-Definitive-Guide-master/data/flight-data/parquet")
9.5 ORC 파일¶
- 하둡 워크로드를 위해 설계된 자기 기술적이며 데이터 타입을 인식할 수 있는 컬럼 기반의 파일 포맷
- 대규모 스트리밍 읽기에 최적화, 필요한 로우를 신속하게 찾아낼 수있는 기능이 통합
- 하이브에 최적화
In [14]:
spark.read.format("orc")
.load("C:/Users/COM/Desktop/home/study_group/1st_spark_studygroup_2106_/Spark-The-Definitive-Guide-master/data/flight-data/orc/2010-summary.orc").show(5)
+-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ | United States| Romania| 1| | United States| Ireland| 264| | United States| India| 69| | Egypt| United States| 24| |Equatorial Guinea| United States| 1| +-----------------+-------------------+-----+ only showing top 5 rows
9.7 텍스트 파일¶
- 일반 텍스트 파일도 읽을 수 있음
- 각줄은 DF 의 레코드가 되며 변환하는 것도 마음대로 할 수 있음
텍스트 파일 읽기¶
In [17]:
spark.read.textFile("C:/Users/COM/Desktop/home/study_group/1st_spark_studygroup_2106_/Spark-The-Definitive-Guide-master/data/flight-data/csv/2010-summary.csv")
.selectExpr("split(value, ',') as rows").show()
+--------------------+ | rows| +--------------------+ |[DEST_COUNTRY_NAM...| |[United States, R...| |[United States, I...| |[United States, I...| |[Egypt, United St...| |[Equatorial Guine...| |[United States, S...| |[United States, G...| |[Costa Rica, Unit...| |[Senegal, United ...| |[United States, M...| |[Guyana, United S...| |[United States, S...| |[Malta, United St...| |[Bolivia, United ...| |[Anguilla, United...| |[Turks and Caicos...| |[United States, A...| |[Saint Vincent an...| |[Italy, United St...| +--------------------+ only showing top 20 rows
텍스트 파일 쓰기¶
In [18]:
csvFile.select("DEST_COUNTRY_NAME").write.text("C:/Users/COM/Desktop/home/study_group/1st_spark_studygroup_2106_/Spark-The-Definitive-Guide-master/data/flight-data/csv/2010-summary.txt")
9.8 고급 I/O 개념¶
- 쓰기 작업 전 파티션 수를 조절함으로 병렬로 처리할 파일 수를 제어할 수있음
- 버켓팅과 파ㅣ셔닝을 조절하면서 데이터의 저장 구조를 제어할 수 있음
- 기본적으로 분할을 지원함
- 필요한 부분만 읽을 수 있으므로 성능 향상에 도움
- 추천 포맷 : 파케이 포맷, GZIP 압축 방식
병렬로 데이터 읽기¶
- 여러 파일을 동시에 읽을 수는 있음
- 개별 파일은 DafaFrame 의 파티션이 됨
파티셔닝¶
- 어떤 데이터를 어디에 저장할 것인지 제어할 수 이쓴 기능
- 디렉터리 별로 컬럼 데이터를 인코딩해 저장
- 위의 이유로 전체를 스캔하지 않고 하루
버켓팅¶
- 파일에 저장된 데이터를 제어할 수 있는 다른 파일 조직화 기법
- 데이터를 읽을 때 셔플을 피할 수 없음
In [ ]:
반응형
'Data Engineering > Spark' 카테고리의 다른 글
[ Spark ] 스파크 간단 스터디 6 (0) | 2021.07.18 |
---|---|
[ Spark ] 스파크 간단 스터디 5 (2) | 2021.07.11 |
[Spark] 스파크 간단 스터디 3 (0) | 2021.07.07 |
[Spark] 스파크 간단 스터디 2 (0) | 2021.06.27 |
[Spark] 스파크에 대한 간단 스터디 1 (0) | 2021.06.19 |