GCP Dataflow SQL로 쉽게 Streaming Data를 처리하는 Data pipeline 구성하기
1. Data Pipeline이란
Data의 홍수라고 불러도 될 만큼 정보가 넘쳐나는 시대에서, 어떤 Data를 어떻게 처리하는지에 대한 관심과 필요성은 날로 증대되어왔습니다.
이렇게 Big data, Data analytics에 사용되는 Data processing이 이제는 하나의 중대한 과제로 떠오르면서 Data pipeline이라고 불리우는 개념이 등장했습니다.
Data pipeline은 Data analysis를 위해 Input data를 받아 다른 Output으로 병렬, 혹은 직렬적으로 처리해 최종적으로 Storage에 저장하는 일련의 과정을 뜻합니다.
ETL(Extract, Transform, Load) Process라고도 하는 이 Data processing 과정을 처리하기 위해 Date pipeline은 여러 곳에서 취합한 Law data들을 가공하고, 변형해서 알맞은 Storage에 저장하게 됩니다.
하지만 이 과정을 수행하기 위해서는 각각 Extract, Transform, Load에 해당하는 과정을 다룰 수 있어야 하기 때문에 Data pipeline 구성은 결코 쉬운 작업이 아니었습니다.
물론 GCP는 Data pipeline을 Cloud 환경에서 쉽게 구성해줄 수 있는 Product들을 제공하고 있습니다.
위 그림과 같이 Pub/sub, Dataflow, GCS, Dataproc, bigQuery 등 다양한 Product들이 Data pipeline의 구성에 사용될 수 있습니다.
그럼에도 불구하고 수 많은 Product중 어떤 것을 사용해야 하는지, 그 Product를 어떻게 사용해야 하는지 처음부터 배워야 한다는 진입장벽이 존재해 여전히 Data pipeline의 구성은 쉽지 않습니다.
저 또한 GCP 환경에서 Streaming data의 Data pipeline 구성 중 같은 장벽에 부딪혀 고민했던 적이 있었습니다.
그 때 제가 찾은 해답은 Dataflow SQL이었습니다.
2. Dataflow SQL 이란
Dataflow SQL이란 GCP의 Data warehouse product인 bigQuery에서 Query 형식으로 Dataflow job을 실행할 수 있게 해주는 기능입니다.
기존의 Dataflow는 ETL process를 위해 GCP에서 제공하는 Template을 사용할 수 밖에 없거나 자체적으로 custom template을 만들어야 한다는 한계 때문에 작업에 난이도가 있었지만,
Dataflow SQL은 이 작업을 친숙한 Query syntax로 처리할 수 있게 만들어주기 때문에 작업 난이도를 대폭 낮춰준다는 장점이 있습니다.
Dataflow SQL에서 사용되는 Query syntax는 ZetaQuery라는 query language의 일부를 차용하고 있지만 bigQuery의 query syntax와 상당히 유사하기 때문에 기존 bigQuery를 이용했던 사용자라면 어려움 없이 Data pipeline을 구성할 수 있습니다.
Streaming job도 지원하기 때문에 Pub/sub같이 끊임없이 들어오는 Data들을 받아서 지속적으로 처리하는 Long-running operation도 수행할 수 있습니다.
본 포스팅에서는 Pub/sub에서 들어오는 Streaming data를 Dataflow sql로 처리해서 bigQuery로 저장하는 일련의 Pipeline을 구성해보도록 하겠습니다.
3. Data pipeline의 구성요소
Data analysis를 수행하는데 중요한 요소 중 하나로 Data가 얼마나 최신화 되어있는지를 뜻하는 Data Freshness가 있습니다.
가능한 최신의 Data를 토대로 분석하는 것이 분석의 질을 좌우하기 떄문에, 더 나은 Data analysis를 위해서는 Batch process로 일정 시간마다 Data를 처리하기 보다는 Streaming process로 지속적으로 Data를 제공할 수 있는 Data pipeline이 필요합니다.
위의 Architecture는 언급한 Streaming process를 처리할 수 있는 Data pipeline을 구성한 것입니다.
구성 요소에는
1. Data를 지속적으로 제공하는 Publisher
2. Publisher가 제공하는 Message를 받는 Pub/sub
3 Pub/sub의 Message를 받아 bigQuery에 insert해주는 Dataflow
4. 마지막으로 Analysis를 위해 Data를 저장할 bigQuery가 있습니다.
Dataflow SQL은 여기서 Dataflow의 생성 작업을 대신해 Pub/sub의 Message를 Query syntax로 가공하는 Dataflow job을 수행하게 됩니다.
4. Dataflow SQL로 Data pipeline을 구성하는 방법
이제 본격적으로 Dataflow SQL을 사용해 Data pipeline을 구성해보겠습니다.
Dataflow SQL을 사용하기 이전에 Message를 받을 Pub/sub topic과 최종적으로 Data가 저장될 bigQuery dataset을 생성해야 합니다.
4-1. Query engine 설정
Pub/sub과 bigQuery dataset을 생성한 뒤, bigQuery web console UI로 진입한 후 More -> query settings를 클릭해 Cloud Dataflow engine을 query engine으로 설정합니다.
위와 같이 Cloud Dataflow engine을 지정하면 bigQuery에서 실행하는모든 query는 Dataflow SQL이 처리하게 되며, Dataflow job을 생성해 Processing을 진행하게 됩니다.
4-2. Data source 추가
그 후 Resource 탭의 ADD DATA를 클릭하면 이전에 보이지 않던 "Cloud Dataflow sources"라는 항목이 나타납니다.
여기서 Dataflow SQL이 처리할 Data source를 지정할 수 있습니다.
현재는 지원하는 Source로 GCS와 Pub/sub이 있습니다. 본 포스팅에서는 Pub/sub에서 온 Message를 처리해야 하므로 이전에 생성한 Pub/sub topic을 선택합니다.
4-3. Pub/sub topic 확인
Pub/sub topic을 source로 추가했다면 Dataset list에 아래와 같이 Cloud Pub/sub topic들의 list를 볼 수 있습니다.
Dataset에 Pub/sub을 추가했다고 바로 Query작업을 수행할 수 없습니다.
Pub/sub에서 오는 Message를 bigQuery Schema에 맞게 변형해주는 작업을 Dataflow에서 처리해야 하기 때문에
Pub/sub에서 받을 Message와 동일한 구조의 Schema를 미리 지정해야 정상적으로 Processing을 수행할 수 있습니다.
4-4. Pub/sub Schema 지정
지정한 Pub/sub를 활성화한 뒤 Edit schema를 클릭하면 bigQuery table과 동일하게 schema를 지정할 수 있는 탭이 등장합니다.
위는 지정한 Pub/sub schema의 예시입니다.
유의해야 할 점은 맨 위의 event_timestamp는 꼭 존재해야 한다는 것입니다. Timestamp column이 없을 시 Dataflow job 실행과정에서 Error가 발생합니다.
Schema를 설계하는 방법은 Pub/sub Message는 Json format만 가질 수 있기 때문에, Key:value 쌍으로 이루어진 Json object에서 Key에 해당하는 이름을 Column으로 변환한다고 생각하면 됩니다.
Json object의 Array는 Repeated Mode로, 중첩된 Key는 Struct type으로 지정해야 합니다.
4-5. Query 실행
Pub/sub의 Schema까지 지정했다면 이제 Query를 실행해 Dataflow job을 생성할 수 있습니다.
Query문은 위와 같이 일반적으로 사용하는 Query syntax를 사용해도 크게 문제되지 않습니다.
기존의 Dataflow는 Javascript function을 통해 Code로써 변환작업을 지정해야 했지만,
Dataflow SQL은 위와 같이 친숙한 Query문을 사용해서 Data를 쉽게 변환할 수 있다는 장점이 있습니다.
Query문을 작성한 뒤 Create Dataflow job을 클릭하면 Dataflow job을 생성하는 탭이 등장합니다.
4-6. Dataflow job 생성
Dataflow job 생성 탭에서 Processing된 data의 destination과 dataflow job을 처리할 vm의 region 등 다양한 설정을 할 수 있습니다.
본 포스팅에서는 Data pipeline의 목적지가 bigQuery table이므로 destination으로 적절한 bigQuery table을 지정합니다.
그 외에도 Pub/sub을 Destination으로 지정해 변환된 Data를 다른 Message bus로 연결할 수도 있습니다.
하단의 "additional output"을 활성화하면 최대 2개의 Destination을 지정할 수 있어 하나의 source에서 오는 Data를 여러 목적지로 복사해 넣을 수도 있습니다.
설정을 완료했다면 "Create" 버튼을 클릭해 Dataflow job을 생성합니다.
4-7. Dataflow job 확인
이후 Dataflow로 진입하면 아래와 같은 operation으로 구성된 Dataflow job이 생성된 것을 볼 수 있습니다.
실제 Streaming data를 처리하는 데에는 생성 후 3분 정도가 소요됩니다.
Job의 생성이나 Data 처리 과정에서 Error가 발생한다면 log를 통해 원인을 분석할 수 있습니다.
4-8. Data pipeline 동작 Test
Dataflow job이 정상적으로 생성되었다면 이제 모든 Data pipeline 구성이 완료되었습니다.
실제로 Publisher에서 Message를 Streaming하거나, 아래와 같이 Pub/sub topic에서 인위적으로 Test message를 보내는 방법으로 Data pipeline이 정상적으로 동작하는지 테스트할 수 있습니다.
Message를 보낸 후 bigQuery에 정상적으로 row가 추가된 것이 보인다면 Data pipeline이 정상적으로 동작한다는 것입니다.
이제 Dataflow SQL을 활용한 Data pipeline이 완성되었습니다.
여기서 수행하는 전체 ETL과정을 처리하는데 채 1분도 걸리지 않았습니다.
즉 Publisher가 Message를 보내자마자 모든 처리과정을 거쳐 bigQuery로 저장되는 과정이 거의 real-time으로 실행된다는 뜻입니다.
이렇게 Data가 생성되는 즉시 Data warehouse에 저장할 수 있게 되면 Data Analysis를 수행했을 시 항상 가장 최신의 Data를 토대로 하는 결과값을 얻을 수 있습니다.
5. 마치며
이렇게 Dataflow SQL을 이용해서 Data pipeline을 구성하는 법을 알아봤습니다.
Cloud service가 활성화되면서 이전에는 구현하기 어려웠던 다양한 기능들을 이제 쉽게 이용할 수 있게 되었습니다.
이번 Data pipeline을 구성하는데 사용한 요소들만 해도 기존에는 Apache Kafka와 Apache Beam을 On-prem의 리소스 위에 수작업으로 구현했어야 했지만, 이제는 Cloud 환경에서 편리하게 구현할 수 있습니다.
단 이제는 Data pipeline의 구성이 더 이상 높은 진입장벽이 아니게 된 만큼 Cloud engineer라면 이러한 기능들을 구현할 수 있는 것이 필수인 시대가 되었습니다.
앞으로 단순히 기능을 구현하는 것 보다는 목적을 달성하는 데 있어서 어떤 기능이 동원될 수 있는지 파악할 수 있는 역량이 더욱 중요해 질 것이라고 생각합니다.