Data Engineering/NiFi

[ NiFi ] 6. Json 파일을 CSV 파일로 변환하기

쟈누이 2021. 5. 11. 18:09
반응형

 

1. 개요


Json 파일을 CSV 파일로 변환하여 특정 경로에 저장하는 프로세스를 기록하고자 한다.

이전 게시물에서 사용했던 프로세서와 일부 외부에서 일부 내용을 참고했다. 

 

전체적인 프로세스는 아래 이미지와 같다

 

 

 

2. JSON to CSV


1) 파일 가져오기

GetFile 프로세서는 특정 경로에 있는 파일을 가져오는 프로세서이다. 환경설정에 들어가서 properties 를 클릭하면 input directory 라는 property 에 input 하고자 하는 파일들이 있는 경로를 지정해주면 알아서 파일을 가져온다

 

 

 

2) split json (json 파일 분할하기)

 

이번에 사용한 json 파일의 내용은 아래와 같다

 {

  "person":[
     { "name": "Bob",
       "age": "16",
       "employed": "No"
     },
     { "name": "Vinny",
       "age": "56",
       "employed": "Yes"
     }
  ]
}    

그리고 프로세스틑 다음과 같다. 5번 포스팅에서 json 을 변환할때, 적용한적이 있는 프로세스이다.

 

특정 Json 을 파싱하려면 JsonPath Expression 에 파싱하고자하는 부분의 key 를 입력해준다.

$.data  라는 형식으로 입력을 하는데 해당 방법은 NiFi 에서 특정 key 값을 파싱하려고 할 때, 사용하는 방식임으로 잘 알아둘 것.

ControlRate은 데이터가 후속 프로세서로 전송되는 속도를 제어하는 역할을 한다.

ControlRate settings는 failure, properties 설정을 해준다. 

 

흐르는 file 을 컨트롤 하는 것이기 때문에 Rate Control Criteria 는 flowfile count 로 해주고 

Maximum Rate 는 normal 값인 1 로 기입해주면 된다.

제대로 되었다면 queue 를 체크했을 때, 아래와 같이 데이터가 출력된다.

 

 

 

3) JSON to CSV - ConvertRecord 프로세서 생성

 

JSON 을 CSV 로 변환하기위해 필요한 프로세서는 ConvertRecord 프로세서 이다.

 

json 을 csv 으로 변환하기 위한 convert record 프로세스를 생성한다.

이 프로세스는 특정 확장자명으로 파일을 convert 할 수 있다.

 

 

properties 의 record reader 를 설정해준다. 출력된 데이터를 어떤 확장자로 convert 할지는 record reader 에서 따로 설정할 수 없으므로 특정 확장자를 처리할 수 있는 reader 를 만들어주어야 한다.

 

operate 섹션의 configureation 을 클릭한 다음, 

' + ' 부분을 클릭한다. 

나는 json 파일을 csv 파일로 convert 시킬 것이기 때문에 controller service 로 AvroSchemaRegistry, CSVRecordSetWriter, JsonTreeReader를  만든다.

 

3-1) AvroSchemaRegistry 설정

 

이 그룹 환경설정은 avro, json, csv 등의 확장자들이 그들이 가지고 있는 스키마에 접근하고 등록하게 도와준다. 즉 이 레지스트리를 등록하면 추후 CSVRecordSetWriter, JsonTreeReader 가 json 파일에 접근할 수 있게 도와준다.

 

 

demo_schema 라는 property 를 만들어서 json 파일과 관련된 스키마를 등록한다.

 

3-2) CSVRecordSetWriter 등록

 

아래 빨간색 점으로 표시한 부분의 정보를 입력한다. 앞서 만들었던 AvroSchemaRegistry 와 거기에 등록한 schema 정보를 등록하면 된다. 

json 을 변환하는 것이기 때문에 schema access strategy 는 Use 'Schema Name' Property 로 설정한다.

 

3-3) JsonTreeReader 등록

 

3-2 의 CSVRecordSetWriter 와 똑같이 등록하면 된다. json key를 읽어들여 파싱하는데 도움을 주는 환경 설정이다.

 

위 설정들을 마무리하고 난 뒤  convertRecord 프로세서에 아래와 같이 등록한다

 

잘 파싱이 되면 아래와 같이 CSV 형식으로 데이터가 출력이 된다.

 

 

 

4) CSV 형식을 CSV 확장자 파일로 저장하기

 

특정 확장자로 저장해주기 위해서 아래의 updateAttribute 프로세서를 사용한다.

filename 이라는 property 를 만들어 아래와 같이 nifi 만의 언어를 사용하여 파일이 만들어질 때, 특정 형태로 파일이 만들어지도록 설정한다

위에 대한 설명을 찾던중 잘 위 filename 에 대해 잘 설명해 놓은 블로그가 있어 아래 내용을 가져왔다.

 

  • filename filename 은 모든 Flowfile의 공통 Attributes중 하나입니다.
  • 해당 Attributes(filename)은 따로 값을 주지 않을시에 유니크한 임의 숫자로 된 값이 할당됩니다.
  • 해당 플로우에서는 NiFi Expression Language 를 사용하여 테이블이름과 현재 시간을 조합하여 파일이름을 결정하도록 설정하였습니다. 
  • NiFi Expression Language는 Flowfile의 Attribute들을 사용하여 원하는 값을 구하도록 도와주는 NiFi 내장 언어입니다. 자세한 문법의 내용은 방대하므로 공식 문서 https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html 를 참조하십시오.

 

 

5) Put 데이터

 

이 프로세서는 간단하다. 한마디로 파일을 특정 경로에 넣는 역할을 가진 프로세서이다.

directory property 에 파일을 넣고자하는 경로를 입력하면 된다.

 

 

아래와 같이 파일이 만들어진다

하지만, 여기는 파일 설정을 잘못해서 여러개의 json 이 파싱이 되었지만 한개의 파일만 만들어졌다. 이 부분은 추후 구글링을 통하여 고칠 예정이다. 


만들어진 파일을 열어보면 아래와 같이 최종적으로 json 파일이 csv 파일로 변환된 것을 확인할 수 있다

 

 

 

3. 참고 링크


gist.github.com/cheerupdi/4c9014022df5fa09ea4fcffe5396add5

 

7. NiFi CSV to MongoDB

7. NiFi CSV to MongoDB. GitHub Gist: instantly share code, notes, and snippets.

gist.github.com

nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache.nifi.processors.standard.PutFile/

 

PutFile

PermissionsSets the permissions on the output file to the value of this attribute. Format must be either UNIX rwxrwxrwx with a - in place of denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). You may also use expression language such as ${f

nifi.apache.org

 

반응형