Data Engineering/NiFi

[ NiFi ] 5. Api 데이터를 가공하여 MySQL 로 보내기

쟈누이 2021. 5. 7. 15:54
반응형

 

 

 

1. 개요


이번에는 외부 API 를 추출하고 난 다음에 MySQL 에 넣어보는 사례를 기록하고자 한다. 

REST API를 불러와서 -> json형식으로 변환 -> 데이터베이스에 삽입 의 프로세스이다

전체적인 경로는 아래 이미지와 같다.

 

 

 

 

 

2. 외부 API -> MySQL 순서


이번 API 는 항공기 관련 정보들이 있는 aviationstack 에서 가져왔으며, url 은 아래와 같다

aviationstack.com/

 

aviationstack - Real-Time Flight Tracker & Status API

Free, Real-time Flight Status & Global Aviation Data API Flight tracker & airport timetable data web service trusted by 5.000+ of the smartest companies 250+ countries and 13.000+ airlines Scalable REST API infrastructure Get Free API Key API Documentation

aviationstack.com

 

1) API 접속에 필요한 access_key 를 자동으로 전달하는 프로세서를 만든다

 

만약 secret key 와 같은 추가적인 키가 필요할 경우 access key 를 추가하는 것과 비슷한 방법으로 추가를 해주면 된다. 

필요한 key 들을 Add Property 의 형식으로 추가해주고 이를 다음 프로세서에 전달한다.

 

 

 

추가적으로 api 를 가져올 때는 다른 key 값들이 필요한 경우가 있다.

위 예시의 경우 필요한 자료를 가져오기 위해서는 flight_status, limit  등의 token key 가 필요해서 추가적으로 기입을 해주었다.

만약에 다른 api 를 가져올 경우에는 필요한 token key 를 add property 로 추가하면 된다.

 

아래와 같이 attribute 가 뜨며 이 정보가 다음 단계의 프로세서로 전달된다

 

 

 

2) InvokeHttp 프로세서로 외부 url 를 연결하여 api 데이터를 가져온다

 

invokeHTTP 프로세서를 생성했을 시 setting 에 failure, no retry, retry를 설정한다.

 

Remote URL에 aviationstack.com에서 예시로 보여줬던 url를 기입한다.

기입할때 GenerateFlowFile에서 설정했던 access_key와 flight_status와 limit을 변수로 사용한다.

만약 다른 token key 를 사용했다면 다른 token key 와 url 을 사용하면 된다.

 

 

NiFi 의 경우에는 generateflowfile 로 전달받은 value 를 사용하려면

key 값을 ${ } 의 형태로 만들고 안에 key 값을 작성하면 key 와 연결된 value 가 매칭된다.

 

제대로 연결이 되었다면 queue 에 api 값이 머무르게 된다.

 

 

 

3) SplitJson 과 ControlRate 를 설정해준다.

 

splitJson 프로세서를 사용해서 json value 를 나누어준다.

위 이미지에 보이는 "data" :[ ] 부분을 split 해서 안에 있는 데이터를 파싱할 것이다. 

 

 

특정 Json 을 파싱하려면 JsonPath Expression 에 파싱하고자하는 부분의 key 를 입력해준다.

$.data  라는 형식으로 입력을 하는데 해당 방법은 NiFi 에서 특정 key 값을 파싱하려고 할 때, 사용하는 방식임으로 잘 알아둘 것.

ControlRate은 데이터가 후속 프로세서로 전송되는 속도를 제어하는 역할을 한다.

ControlRate settings는 failure, properties 설정을 해준다. 

 

흐르는 file 을 컨트롤 하는 것이기 때문에 Rate Control Criteria 는 flowfile count 로 해주고 

Maximum Rate 는 normal 값인 1 로 기입해주면 된다.

 

제대로 되었다면 queue 를 체크했을 때, 아래와 같이 데이터가 출력된다.

 

 

 

4) JoltTransformJSON 을 설정한다.

 

Automatically Terminate Relationships 설정시 failure 을 클릭하고, Jolt 설정을 해주기 위해 아래 advanced 를 클릭한다. 

그리고 난 후, Jolt Specification안에 바꿀 json 키값과 매칭시킬 value 값을 넣는다.

 

 

jolt specification 에 넣을 json 매칭 형식

[{
 "operation": "shift",
 "spec": {
 "flight_date": "flight_date",
 "flight_status": "flight_status",
 "departure": {
 "airport": "depAirport",
 "timezone": "arrTimezone"
    },
   "arrival": {
    "airport": "depAirport",
 "timezone": "arrTimezone",
     "terminal": "arrTerminal",
     "gate": "arrGate"
    },
   "airline": {
    "name": "airlineName"
   },
   "flight": {
     "number": "flightNumber"
    },
   "aircraft": {
     "registration": "aircraftRegistration"
    },
   "live": {
     "updated": "LiveUpdated",
     "latitude": "latitude",
     "longitude": "longitude",
     "altitude": "altitude",
     "direction": "direction",
      "speed_horizontal": "speedHorizontal",
     "speed_vertical": "speedVertical",
     "is_ground": "isGround"
    }
  }
}]

 

데이터가 제대로 매칭이 되었다면 아래와 같이 확인할 수 있다.

 

 

5) ConvertJsonToSQL 과 PutSQL 프로세서 설정

 

해당 설정방법에 대해서는 이전 포스팅에서 설명을 했으므로, 아래 링크 참고하여 설정하면 된다.

snepbnt.tistory.com/410

 

[ NiFi ] 4. 리눅스에서 MySql 로 데이터 보내기

1. 개요 기본적인 튜토리얼에서 벗어나 NiFi 를 통해 좀 더 심화된 프로세스를 구축하는 연습을 하기위해 해당 포스팅을 작성했다. 추후 NiFi 를 사용할 일이 있을 때, 해당 포스팅을 참고하여 연습

snepbnt.tistory.com

 

성공을 하면 최종적으로 MySQL 에 아래와 같이 데이터가 들어간다

 

 

 

 

 

3. 참고 링크


자세한 사항은 아래 링크를 참고하기 바란다

 

velog.io/@modsiw/NiFi-invokeHTTP%EB%A1%9C-REST-API%EB%A5%BC-%EB%B0%9B%EC%95%84%EC%99%80%EC%84%9C-DB%EC%97%90-INSERT%ED%95%98%EA%B8%B0

 

[NiFi] invokeHTTP로 REST API를 받아와서 DB에 INSERT하기

전체적인 흐름 REST API를 불러와서 -> json형식으로 변환 -> 데이터베이스에 삽입 환경설정 OS : CentOS 7 DBMS : MySQL 전체적인 흐름 불러올 open api 고르기 필자는 https://aviationstack.com 사이트에서 비행

velog.io

 

반응형