[ NiFi ] 5. Api 데이터를 가공하여 MySQL 로 보내기

2021. 5. 7. 15:54·Data Engineering/NiFi
반응형

 

 

 

1. 개요


이번에는 외부 API 를 추출하고 난 다음에 MySQL 에 넣어보는 사례를 기록하고자 한다. 

REST API를 불러와서 -> json형식으로 변환 -> 데이터베이스에 삽입 의 프로세스이다

전체적인 경로는 아래 이미지와 같다.

 

 

 

 

 

2. 외부 API -> MySQL 순서


이번 API 는 항공기 관련 정보들이 있는 aviationstack 에서 가져왔으며, url 은 아래와 같다

aviationstack.com/

 

aviationstack - Real-Time Flight Tracker & Status API

Free, Real-time Flight Status & Global Aviation Data API Flight tracker & airport timetable data web service trusted by 5.000+ of the smartest companies 250+ countries and 13.000+ airlines Scalable REST API infrastructure Get Free API Key API Documentation

aviationstack.com

 

1) API 접속에 필요한 access_key 를 자동으로 전달하는 프로세서를 만든다

 

만약 secret key 와 같은 추가적인 키가 필요할 경우 access key 를 추가하는 것과 비슷한 방법으로 추가를 해주면 된다. 

필요한 key 들을 Add Property 의 형식으로 추가해주고 이를 다음 프로세서에 전달한다.

 

 

 

추가적으로 api 를 가져올 때는 다른 key 값들이 필요한 경우가 있다.

위 예시의 경우 필요한 자료를 가져오기 위해서는 flight_status, limit  등의 token key 가 필요해서 추가적으로 기입을 해주었다.

만약에 다른 api 를 가져올 경우에는 필요한 token key 를 add property 로 추가하면 된다.

 

아래와 같이 attribute 가 뜨며 이 정보가 다음 단계의 프로세서로 전달된다

 

 

 

2) InvokeHttp 프로세서로 외부 url 를 연결하여 api 데이터를 가져온다

 

invokeHTTP 프로세서를 생성했을 시 setting 에 failure, no retry, retry를 설정한다.

 

Remote URL에 aviationstack.com에서 예시로 보여줬던 url를 기입한다.

기입할때 GenerateFlowFile에서 설정했던 access_key와 flight_status와 limit을 변수로 사용한다.

만약 다른 token key 를 사용했다면 다른 token key 와 url 을 사용하면 된다.

 

 

NiFi 의 경우에는 generateflowfile 로 전달받은 value 를 사용하려면

key 값을 ${ } 의 형태로 만들고 안에 key 값을 작성하면 key 와 연결된 value 가 매칭된다.

 

제대로 연결이 되었다면 queue 에 api 값이 머무르게 된다.

 

 

 

3) SplitJson 과 ControlRate 를 설정해준다.

 

splitJson 프로세서를 사용해서 json value 를 나누어준다.

위 이미지에 보이는 "data" :[ ] 부분을 split 해서 안에 있는 데이터를 파싱할 것이다. 

 

 

특정 Json 을 파싱하려면 JsonPath Expression 에 파싱하고자하는 부분의 key 를 입력해준다.

$.data  라는 형식으로 입력을 하는데 해당 방법은 NiFi 에서 특정 key 값을 파싱하려고 할 때, 사용하는 방식임으로 잘 알아둘 것.

ControlRate은 데이터가 후속 프로세서로 전송되는 속도를 제어하는 역할을 한다.

ControlRate settings는 failure, properties 설정을 해준다. 

 

흐르는 file 을 컨트롤 하는 것이기 때문에 Rate Control Criteria 는 flowfile count 로 해주고 

Maximum Rate 는 normal 값인 1 로 기입해주면 된다.

 

제대로 되었다면 queue 를 체크했을 때, 아래와 같이 데이터가 출력된다.

 

 

 

4) JoltTransformJSON 을 설정한다.

 

Automatically Terminate Relationships 설정시 failure 을 클릭하고, Jolt 설정을 해주기 위해 아래 advanced 를 클릭한다. 

그리고 난 후, Jolt Specification안에 바꿀 json 키값과 매칭시킬 value 값을 넣는다.

 

 

jolt specification 에 넣을 json 매칭 형식

[{
 "operation": "shift",
 "spec": {
 "flight_date": "flight_date",
 "flight_status": "flight_status",
 "departure": {
 "airport": "depAirport",
 "timezone": "arrTimezone"
    },
   "arrival": {
    "airport": "depAirport",
 "timezone": "arrTimezone",
     "terminal": "arrTerminal",
     "gate": "arrGate"
    },
   "airline": {
    "name": "airlineName"
   },
   "flight": {
     "number": "flightNumber"
    },
   "aircraft": {
     "registration": "aircraftRegistration"
    },
   "live": {
     "updated": "LiveUpdated",
     "latitude": "latitude",
     "longitude": "longitude",
     "altitude": "altitude",
     "direction": "direction",
      "speed_horizontal": "speedHorizontal",
     "speed_vertical": "speedVertical",
     "is_ground": "isGround"
    }
  }
}]

 

데이터가 제대로 매칭이 되었다면 아래와 같이 확인할 수 있다.

 

 

5) ConvertJsonToSQL 과 PutSQL 프로세서 설정

 

해당 설정방법에 대해서는 이전 포스팅에서 설명을 했으므로, 아래 링크 참고하여 설정하면 된다.

snepbnt.tistory.com/410

 

[ NiFi ] 4. 리눅스에서 MySql 로 데이터 보내기

1. 개요 기본적인 튜토리얼에서 벗어나 NiFi 를 통해 좀 더 심화된 프로세스를 구축하는 연습을 하기위해 해당 포스팅을 작성했다. 추후 NiFi 를 사용할 일이 있을 때, 해당 포스팅을 참고하여 연습

snepbnt.tistory.com

 

성공을 하면 최종적으로 MySQL 에 아래와 같이 데이터가 들어간다

 

 

 

 

 

3. 참고 링크


자세한 사항은 아래 링크를 참고하기 바란다

 

velog.io/@modsiw/NiFi-invokeHTTP%EB%A1%9C-REST-API%EB%A5%BC-%EB%B0%9B%EC%95%84%EC%99%80%EC%84%9C-DB%EC%97%90-INSERT%ED%95%98%EA%B8%B0

 

[NiFi] invokeHTTP로 REST API를 받아와서 DB에 INSERT하기

전체적인 흐름 REST API를 불러와서 -> json형식으로 변환 -> 데이터베이스에 삽입 환경설정 OS : CentOS 7 DBMS : MySQL 전체적인 흐름 불러올 open api 고르기 필자는 https://aviationstack.com 사이트에서 비행

velog.io

 

반응형
저작자표시 비영리 변경금지 (새창열림)

'Data Engineering > NiFi' 카테고리의 다른 글

[ NIFI ] 7. Apache NiFi Overview  (0) 2021.05.20
[ NiFi ] 6. Json 파일을 CSV 파일로 변환하기  (0) 2021.05.11
[ NiFi ] 4. 리눅스에서 MySql 로 데이터 보내기  (0) 2021.04.30
[ NiFi ] 3. NiFi 튜토리얼  (0) 2021.04.28
[ NiFi ] 2-3. NiFi 설치하기 - Docker  (0) 2021.04.27
'Data Engineering/NiFi' 카테고리의 다른 글
  • [ NIFI ] 7. Apache NiFi Overview
  • [ NiFi ] 6. Json 파일을 CSV 파일로 변환하기
  • [ NiFi ] 4. 리눅스에서 MySql 로 데이터 보내기
  • [ NiFi ] 3. NiFi 튜토리얼
쟈누
쟈누
Ad astra per aspera
    반응형
  • 쟈누
    쟈누의 기록공간
    쟈누
  • 전체
    오늘
    어제
    • 분류 전체보기 (444)
      • AWS (31)
        • Glue (4)
        • S3 (1)
      • 클라우드 (0)
      • Data Engineering (37)
        • GitHub (10)
        • NiFi (11)
        • Spark (10)
        • Snowflake (0)
        • 머신러닝, AI (6)
      • 언어 (118)
        • 데이터 베이스 (42)
        • JAVA (9)
        • Python (34)
        • Java Script (15)
        • Linux (18)
      • 프로젝트, 인강 그리고 책 (30)
        • Spotify Project (7)
        • RASA chatbot Project (9)
        • Naver shopping Project (6)
        • 빅데이터를 지탱하는 기술 (8)
      • OLD (56)
        • IT 용어 사전 (13)
        • Front End (12)
        • Back End (31)
      • Error code 모음 (165)
        • 1. SQL errors (17)
        • 2. Hadoop errors (20)
        • 3. Linux Errors (14)
        • 4. Python errors (33)
        • 5. JAVA, Spring errors (41)
        • 6. Jav Script errors (10)
        • 7. Dev Tools errors (9)
        • 8. Git errors (8)
        • 9. Jenkins Errors (4)
        • 10. airflow Errors (2)
        • 11. Aws errors (7)
      • 개인 (1)
        • 책 (1)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
    • 블로그 관리
    • 글쓰기
  • 링크

  • 공지사항

    • 간단한 블로그 소개
  • 인기 글

  • 태그

    리눅스
    API
    MySQL
    json
    node
    Python
    java
    NiFi
    Git
    설치
    Spring
    에러
    자바
    AWS
    error
    install
    python error
    linux
    파이썬
    SQL
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
쟈누
[ NiFi ] 5. Api 데이터를 가공하여 MySQL 로 보내기
상단으로

티스토리툴바