1. 개요
이번에는 외부 API 를 추출하고 난 다음에 MySQL 에 넣어보는 사례를 기록하고자 한다.
REST API를 불러와서 -> json형식으로 변환 -> 데이터베이스에 삽입 의 프로세스이다
전체적인 경로는 아래 이미지와 같다.
2. 외부 API -> MySQL 순서
이번 API 는 항공기 관련 정보들이 있는 aviationstack 에서 가져왔으며, url 은 아래와 같다
1) API 접속에 필요한 access_key 를 자동으로 전달하는 프로세서를 만든다
만약 secret key 와 같은 추가적인 키가 필요할 경우 access key 를 추가하는 것과 비슷한 방법으로 추가를 해주면 된다.
필요한 key 들을 Add Property 의 형식으로 추가해주고 이를 다음 프로세서에 전달한다.
추가적으로 api 를 가져올 때는 다른 key 값들이 필요한 경우가 있다.
위 예시의 경우 필요한 자료를 가져오기 위해서는 flight_status, limit 등의 token key 가 필요해서 추가적으로 기입을 해주었다.
만약에 다른 api 를 가져올 경우에는 필요한 token key 를 add property 로 추가하면 된다.
아래와 같이 attribute 가 뜨며 이 정보가 다음 단계의 프로세서로 전달된다
2) InvokeHttp 프로세서로 외부 url 를 연결하여 api 데이터를 가져온다
invokeHTTP 프로세서를 생성했을 시 setting 에 failure, no retry, retry를 설정한다.
Remote URL에 aviationstack.com에서 예시로 보여줬던 url를 기입한다.
기입할때 GenerateFlowFile에서 설정했던 access_key와 flight_status와 limit을 변수로 사용한다.
만약 다른 token key 를 사용했다면 다른 token key 와 url 을 사용하면 된다.
NiFi 의 경우에는 generateflowfile 로 전달받은 value 를 사용하려면
key 값을 ${ } 의 형태로 만들고 안에 key 값을 작성하면 key 와 연결된 value 가 매칭된다.
제대로 연결이 되었다면 queue 에 api 값이 머무르게 된다.
3) SplitJson 과 ControlRate 를 설정해준다.
splitJson 프로세서를 사용해서 json value 를 나누어준다.
위 이미지에 보이는 "data" :[ ] 부분을 split 해서 안에 있는 데이터를 파싱할 것이다.
특정 Json 을 파싱하려면 JsonPath Expression 에 파싱하고자하는 부분의 key 를 입력해준다.
$.data 라는 형식으로 입력을 하는데 해당 방법은 NiFi 에서 특정 key 값을 파싱하려고 할 때, 사용하는 방식임으로 잘 알아둘 것.
ControlRate은 데이터가 후속 프로세서로 전송되는 속도를 제어하는 역할을 한다.
ControlRate settings는 failure, properties 설정을 해준다.
흐르는 file 을 컨트롤 하는 것이기 때문에 Rate Control Criteria 는 flowfile count 로 해주고
Maximum Rate 는 normal 값인 1 로 기입해주면 된다.
제대로 되었다면 queue 를 체크했을 때, 아래와 같이 데이터가 출력된다.
4) JoltTransformJSON 을 설정한다.
Automatically Terminate Relationships 설정시 failure 을 클릭하고, Jolt 설정을 해주기 위해 아래 advanced 를 클릭한다.
그리고 난 후, Jolt Specification안에 바꿀 json 키값과 매칭시킬 value 값을 넣는다.
jolt specification 에 넣을 json 매칭 형식
[{
"operation": "shift",
"spec": {
"flight_date": "flight_date",
"flight_status": "flight_status",
"departure": {
"airport": "depAirport",
"timezone": "arrTimezone"
},
"arrival": {
"airport": "depAirport",
"timezone": "arrTimezone",
"terminal": "arrTerminal",
"gate": "arrGate"
},
"airline": {
"name": "airlineName"
},
"flight": {
"number": "flightNumber"
},
"aircraft": {
"registration": "aircraftRegistration"
},
"live": {
"updated": "LiveUpdated",
"latitude": "latitude",
"longitude": "longitude",
"altitude": "altitude",
"direction": "direction",
"speed_horizontal": "speedHorizontal",
"speed_vertical": "speedVertical",
"is_ground": "isGround"
}
}
}]
데이터가 제대로 매칭이 되었다면 아래와 같이 확인할 수 있다.
5) ConvertJsonToSQL 과 PutSQL 프로세서 설정
해당 설정방법에 대해서는 이전 포스팅에서 설명을 했으므로, 아래 링크 참고하여 설정하면 된다.
성공을 하면 최종적으로 MySQL 에 아래와 같이 데이터가 들어간다
3. 참고 링크
자세한 사항은 아래 링크를 참고하기 바란다
'Data Engineering > NiFi' 카테고리의 다른 글
[ NIFI ] 7. Apache NiFi Overview (0) | 2021.05.20 |
---|---|
[ NiFi ] 6. Json 파일을 CSV 파일로 변환하기 (0) | 2021.05.11 |
[ NiFi ] 4. 리눅스에서 MySql 로 데이터 보내기 (0) | 2021.04.30 |
[ NiFi ] 3. NiFi 튜토리얼 (0) | 2021.04.28 |
[ NiFi ] 2-3. NiFi 설치하기 - Docker (0) | 2021.04.27 |