Data Engineering/NiFi

[ NiFi ] 4. 리눅스에서 MySql 로 데이터 보내기

쟈누이 2021. 4. 30. 16:15
반응형

 

 

1. 개요


기본적인 튜토리얼에서 벗어나 NiFi 를 통해 좀 더 심화된 프로세스를 구축하는 연습을 하기위해 해당 포스팅을 작성했다. 추후 NiFi 를 사용할 일이 있을 때, 해당 포스팅을 참고하여 연습해야겠다. 

 

완성된 직후, 전체적인 프로세스는 아래와 같다.  리눅스에 있는 편집한 csv 파일을 읽어들인 후에 json 형태로 변형시키고 나서 MySql 에 저장하는 프로세스이다.  

 

 

2. 리눅스 -> MySql 프로세스 순서


우선, 해당 포스트는 리눅스 서버에 MySql 을 설치했다는 가정하에 시작한다. 리눅스 버전은 Cent OS 7 을 사용했으며, MySql 로는 MySql 8 버전을 사용했다.

 

 

1) 데이터를 리눅스 서버에 넣고 mysql 에 데이터 베이스와 테이블을 생성한다.

 

실습을 위해 필요한 데이터를 우선 다운로드 받는다.

나의 경우에는 kaggle 에 있는 imdb 리뷰 데이터 셋을 다운로드 받아서 10개의 데이터로 쪼갰다.

 

그리고 Mysql 에 imdbs 라는 데이터베이스를 생성한 다음에, 리뷰 데이터셋의 header 에 맞게 review, sentiment 라는 컬럼을 가진 imdb 라는 테이블을 만들었다. 

 

 

2) GetFiles 라는 프로세스를 생성한다

 

Get Files 프로세스는 NiFi 에서 특정 경로상에 파일을 NiFi 로 옮기는 기능을 가지고 있다. 

 

GetFile 의 경우 특정 경로에 있는 파일을 가져오는 역할을 맡고 있으므로

Input Directory 부분에 파일이 위치하는 경로를 설정해 준다

 

파일이 들어오는 지 확인하기 위해서 Wait 라는 프로세스를 만들어주고 테스트를 진행해도 된다.

 

 

3) csv 파일의 text 를 split 하는 프로세스를 생성한다

 

split text 는 텍스트를 일정 기준에 맞게 분리하는 역할을 담당하는 프로세스이다.

 

split text 프로세스를 더블클릭한 후, 몇줄씩 split 할 것인지 갯수를 지정해 준다.

 

 

큐로 확인해보면 설정한 대로 잘 쪼개진 것을 확인할 수 있다.

 

 

 

 

4) Convert Record 프로세스를 생성한다.

 

쪼갠 text 데이터를 json 으로 변환하기 위한 convert record 프로세스를 생성한다.

이 프로세스는 특정 확장자명으로 파일을 convert 할 수 있다.

 

 

properties 의 record reader 를 설정해준다. 출력된 데이터를 어떤 확장자로 convert 할지는 record reader 에서 따로 설정할 수 없으므로 특정 확장자를 처리할 수 있는 reader 를 만들어주어야 한다.

 

operate 섹션의 configureation 을 클릭한 다음, 

' + ' 부분을 클릭한다. 

우리는 text 파일을 csv 파일로 convert 시킬 것이기 때문에 controller service 로 csv reader 를 선택한다.

 

 

그 후 환경설정(톱니바퀴 모양) 을 클릭하여 properties 부분의 설정들을 조작해준다. csv 이므로 어떤 방식으로 value 들을 seperater 할 것인지, 인코딩은 무엇으로 할 것인지 등을 설정하면 된다. 

 

 

그리고 나서 빨간색 밑줄의 버튼을 클릭한 후에

 

 

 

ENABLE 버튼을 클릭하면 새로 생성한 Controller Service 을 사용할 수 있게 된다.

 

다시 Convert Record 프로세스에 들어가서 새로 생성한 CSV Reader Controller Service 를 사용하면 된다.

그럼 아래와 같이 json 형태로 잘 변형된 것을 확인할 수 있다

 

 

 

5) ConvertJsonToSQL 프로세스를 생성한다

 

프로세스의 이름과 같이 JSON 파일을 SQL 데이터베이스에 전달하는 역할을 한다.

생성 후, configure processor 에 들어가서 아래와 같이 설정한다.

 

 

JDBC Connection Pool 은 CSV Reader 를 생성한 것처럼 생성하면 된다.

자세한 내용은 아래 블로그를 참고해가며 따라하거나, 영상에 나오는 대로 따라하면 된다.

 

하지만, 영상을 따라할 경우 에러가 발생할 가능성이 크므로 가급적이면 아래 블로그 링크대로 따라하는 것을 추천한다

coding-sojin2.tistory.com/entry/DBCPConnectionPool-nifi-sql-%EC%97%B0%EA%B2%B0%ED%95%98%EA%B8%B0

 

DBCPConnectionPool (nifi sql 연결하기)

SQL Processor을 사용하려면 JDBC Connection Pool이 필요합니다. 그럼 JDBC, DBCP가 무엇인지부터 알아볼까요? 1. JDBC란? - Java Database Connectivity - 자바 프로그램이 데이터베이스와 연결되어 데이터를 주..

coding-sojin2.tistory.com

 

 

6) PutSql 프로세서 생성

 

PutSql 은 위 프로세서에서 update 또는 insert 명령을 실행할 때, sql 에 직접 넣는 프로세서이다.

 

여기서도 JDBC Connection Pool 이 필요한데 먼저 생성해준 관련 Controller Service 를 사용해준다

 

 

7) 전 프로세스를 연결한 후에, 이를 실행한다.

 

그러면 아래와 같이 데이터가 축적되는 것을 확인할 수 있다.

 

 

 

3. 정리


위 과정을 거치면 하나의 응용 프로세스 구축이 가능하다. 자세한 사항은 참고링크의 유튜브 링크를 참고했으며, 유튜브 링크는 굳이 영어 듣기에 약하더라도 영상만으로도 따라하기에 쉽고 잘 설명을 해주고 있다.

 

공부할 시에 아래 유튜브를 참고하면서 공부해도 될 것 같다.

 

 

 

 

 

4. 참고 링크


자세한 프로세스 구축 방법은 아래 유튜브를 참고했다.

www.youtube.com/watch?v=8KxcAiNdqvw

 

반응형