[LINE 블로그 번역] ‘캐쥬얼’한 규모의 데이터 클러스터상의 데이터 분석 처리

by just_do_neet on 2012.9.11

원문 출처 : http://tech.naver.jp/blog/?p=2127  - LINE Engineer's Blog

- 본 글은 위 링크의 LINE Engineer's Blog 를 번역 하였습니다.


LINE 서비스의 통계 분석 기반

 

 현재 저는 LINE의 통계 분석 기반 개발을 담당하고 있습니다. 기반자체는 일년도 전부터 운영되고 있으며, 저는 근래 작업으로 최근 1,2개월 정도는 주로 기반 데이터 구조 재검토와 튜닝을 담당하고 있습니다.

 

 LINE의 통계 분석 기반은, 많은 회사에서 이와 같은 시스템은 존재하겠지만, LINE의 로그 데이터나 RDB 데이터 등을 사용하여 주로 KPI에 참고되는 다양한 통계적 수치를 산출하여 표시 해 주는 시스템 입니다. 그 외에 서비스 운용을 위한 보조적인 기능 등도 몇 가지 있습니다.

 

 시스템 아키텍처에 관해, 특히 통계 처리에 관련한 처리 부분을 뽑아보면 아래와 같습니다.

Input

 

  • 유저의 조작 이력 로그

    • Redis Queue에서 취득 (Streaming)

  • 마스터 데이터 (유저 데이터, 스탬프 정보 등)

    • MySQL 에서 (Batch)

Processing 


Output

 

  • 각종 통계 결과

    • MySQL에 보존 (Batch)

 LINE 서비스는 일반적인Web 어플리케이션과는 아키텍처가 다르기 때문에 접속 로그와 같은 것은 존재 하지 않습니다. 그러므로 유저의 조작 이력 로그를 Redis Queue를 통해서 비동기로 취득하여, 그 쪽을 분석 대상으로 삼고 있습니다.
 또한 분석 처리는 주로 쓰는 조합입니다만, Hadoop과 Hive를 사용하고 있습니다.

직면 해 있던 문제
 감사하게도 많은 분들이 LINE 서비스를 사용하고 있습니다. 유저 증가에 비해서 백엔드 분석 기반으로 처리해야만 하는 데이터 사이즈도 크게 증가하고 있습니다.
 예를 들면, 유저 조작 이력 로그는 최근에는 하루에 약 700 GB 정도 사이즈를 차지합니다.

 

 

 이들 로그 데이터와 마스터 데이터를 합쳐서, 날마다 수십 종류 항목의 분석 처리를 실시하고 있습니다.


 이런 상황 때문에, 날마다 실시하는 데이터 분석 처리도 갈수록 시간이 걸리게 되어, 예를 들어 하루 한 번 실시하고 있는 큰 daily batch process는 최대 11시간반 정도의 시간이 걸리게 된 상태였습니다.

 

 

 거기서 이러한 문제를 해결 하기 위해서, 가능한 한 처리를 빨리 끝내도록 하는 분석 처리 튜닝을 실시할 필요가 있었던 것입니다.

 

튜닝에 앞서
 자 튜닝이다! 라고 단단히 마음 먹는 것은 좋습니다만, 실제 상황을 이야기 하자면 저는 팀에 막 합류한 시기로, 시스템에 뭐가 문제고 뭐가 느린건지를 정확히 파악하지 못하고 있었습니다. 이 때문에, 시작점으로서 우선은 정보를 눈으로 바로 확인 할 수 있게 하는 작업을 실행하였습니다.

 기존의 분석 기반에서는, 분석 처리의 거의 전부가 Python으로 쓰여 있어서, Python에서 Hive Thrift Server를 통해 HQL을 날리는 것으로 분석 처리를 실시하고 있었습니다. 물론 처리 로그 같은 건 비교적 제대로 출력, 보존 되고 있었습니다만, 예를 들어 각 분석 처리의 처리 시간이나 발생한 에러 등을 확인 하기 위해서는 서버에 로그인 할 필요가 있었습니다.
 이것들을 한시라도 빨리 눈으로 확인 할 수 있게 하기 위한 수단으로서 Hadoop과 친화성이 높은 워크 플로우 엔진으로 유명한 ‘Azkaban’을 채용하여 그 쪽에서 분석 처리 컨트롤을 행하도록 하였습니다.

 

 

 Azkaban에 대한 자세한 설명에 대해서는 이 글에서는 생략하지만, 본래 목적인 워크 플로우의 제어 이외에도 각각의 처리 실행 시간이 그래프로 가시화 되기 때문에 어느 처리가 느린 것인지를 특정하기가 편리합니다.
 그러므로, 기존의 분석 처리를 모두 Azkaban상에서 컨트롤 하도록 변경 하고, 처리 내용이나 소요된 처리 시간을 가시화 하여, 이것을 시작으로 튜닝 포인트를 찾기로 하였습니다.

 

튜닝 포인트
 처리 속도 상의 문제점으로 나타난 것은 아래와 같습니다.

  • 마스터 데이터 (MySQL)의 Hadoop, Hive 로의 import 처리가 느림

  • MapReduce 태스크의 가동수가 capacity(용량)을 넘고 있음

    • Hadoop 상의 데이터 구조에 문제

    • 애시당초 서버 리소스 부족

 우선, 데이터 import 처리입니다만, 마스터 데이터의 내용을 Hive에 반영하기 위해 정기적으로 데이터 갱신을 하고 있었습니다만, 그 처리가 single process로 동작하도록 되어 있어, 데이터 급증에 따른 처리 시간이 늘어나 있었습니다. 또한, Hadoop Cluster 대수를 늘리는 등의 방식으로 해결되지 않는 조정하기 어려운 구조였기 때문에 개선 작업이 급선무였습니다.


 근본적으로는 서버 증설 밖에는 대책이 없었습니다만, Hadoop 상에 보존되어 있는 데이터 구조에 몇 가지 개선 가능한 포인트가 있었기 때문에 그 부분을 손보았습니다.


Import 처리의 개선 (Sqoop)
 데이터 import에 대해서는Sqoop이라는 OSS를 채용 하였습니다.
 Sqoop은 현재는 Apache의 Top-Level Project 로서 개발이 진행되고 있는 OSS로, 주로 RDB와 Hadoop, Hive 간에 효율적으로 데이터 import, export를 실행하기 위한 툴입니다.
 기본적인 구조에 관한 그림은 Cloudera 블로그에서 인용하였습니다.

 

(출처 : http://www.cloudera.com/blog/2012/01/apache-sqoop-highlights-of-sqoop-2/

 Sqoop은 기본적으로 커맨드라인으로 동작하는 툴입니다. 몇 가지 옵션을 지정하면 꽤 괜찮게 RDB와 Hadoop, Hive 간의 import, export 처리를 해 줍니다.
 (참고 : Sqoop의 import 커맨드 옵션 일람)


 Sqoop에서 import 처리를 실행하면, MapReduce 의 Job (정확히는 Map Only) 이 기동하여, Map Task 중에 설정 내용에 따라 원래 데이터를 가지는 데이터베이스에 접속하여 dump를 하고, 그 결과를 직접 HDFS에 써넣는 방식으로 작동합니다.

 

 

(출처 : https://blogs.apache.org/sqoop/entry/apache_sqoop_overview
Import 처리 효율화 관점에서 말하자면, 아래 2가지가 고속화에 기여하고 있습니다.

  • 병렬적 import 처리가 가능

  • dump한 데이터를 직접 HDFS에 써넣을 수 있음

 Hadoop을 사용하여 비교적 간단히 병렬 import가 가능한 것도 편리하구요. 이러한 import 처리를 자신의 스크립트에 써두면 우선 로컬 파일상의 dump를 보존하여 이를 HDFS에 put하는 여러 단계의 처리를 거치는 경우가 많아, 특히 HDFS로 큰 파일을 put 하는 것은 아무래도 시간이 걸리는 작업이기 때문에 이러한 부분에서 시간을 절약합니다.
 그런 의미로, Hadoop이나 Hive에서 사용하는 데이터 import에는 Sqoop은 비교적 적합하다고 생각합니다.

튜닝 결과 (Sqoop)
 이번 Sqoop 도입에 의해 처리가 2시간 정도 고속화 되었습니다.

 

 

데이터 구조 개선
 이어서 데이터 구조 개선입니다.

 

 

 LINE의 통계 분석 기반에서는 Hive Table에서 다루는 데이터는 기본적으로는 TextFile 형식으로 보존되고 있었습니다. 물론 Text 형식인 편이 운용면에서는 여러가지 편리한 면도 있고, 이전에는 이걸로 성능적인 문제는 없었습니다만, 이 정도로 데이터가 커지고 나니 좀 더 효율적인 데이터 형식으로 변경할 필요가 있었습니다.
 또한, 다루는 데이터가 크기 때문에 데이터 압축에 대해서도 신경 쓸 필요가 있었습니다.
 이 외에도 데이터에 따라서는 한 파일 당 사이즈가 너무 작은 (HDFS의 한 블록 사이즈 보다도 극단적으로 작은) 것이 존재하고 있어 퍼포먼스에 악영향을 주고 있었습니다.

 이러한 몇가지 점을 착실히 개선 해 보았습니다.

 

TextFile->RCFile로의 마이그레이션
 우선은 Hive 처리에 적합하면서 또한, 데이터 사이즈를 가능한 한 작게 할 수 있는 파일 포맷을 선정합니다. 여러가지로 조사한 결과, 이번 케이스에서는 RCFile이라는 데이터 형식을 채용하는 것이 좋지 않을까 라는 결론이 나왔습니다.
 RCFile은 Hadoop (HDFS) 상에서, 예를 들어 Hive와 같이 구조적인 데이터를 다룰 때에 적합한 파일 포맷입니다. 자세한 내용은 아래 논문을 참조 바랍니다.

RCFile: A Fast and Space-efficient Data Placement Structure in MapReduce-based Warehouse Systems (ICDE’11)

 

 

 특히 개인적으로 좋았던건, 데이터 사이즈를 가능한 한 작게 만들기 위한 고민이 여기저기 녹아 있는 점입니다. 메타데이터의 run length 부호화도 그렇고, 동일한 컬럼 단위로 데이터가 보존되어 있기 때문에 연속해서 같은 경향의 데이터가 출현할 확률이 높아, 압축 알고리즘에 따른 압축을 쉽게 할 수 있습니다.
  실제로 다른 데이터 형식 (TextFile, SequenceFile) 과 비교해도, RCFile이 데이터 사이즈면에서는 가장 우수 했습니다. 아래는 완전 동일한 데이터군을 각각의 파일 형식으로 '압축하지 않은 것'과 '압축 상태인 것'으로 저장하여 어느 정도 사이즈 차이가 나는지를 비교한 것입니다.

 

 

 

 기존의 데이터 중에는 TextFile이면서 압축하지 않은 상태로 저장되어 있는 것도 있었기 때문에, 이번에는 Hive Table의 데이터는 전부 'RCFile + GZIP' 형식으로 저장하도록 변경 했습니다.

※ 실제로는 아직 일부 마이그레이션 중입니다.

 

너무 자잘한 파일의 merge
 Hadoop HDFS 상에서는 블럭 사이즈보다도 극단적으로 작은 파일이 많이 있으면 별로 효율적으로 처리를 하지 못합니다. 이 부분은 매우 유명한 이야기로, Cloudera의 'The Small Files Problem' 이란 블로그 기사가 매우 쉽게 쓰여 있어 참고 할 만 합니다. 


 자잘한 파일의 merge는 일반적으로는 HAR를 사용하는 경우가 많은 것 같습니다만, 이번에는 Hive merge와 관련된 옵션을 사용했습니다.

 

 

 'hive.merge.mapfiles', 'hive.merge.mapredfiles' 로 출력 파일의 merge 처리 on/off 설정을 합니다. 사이즈 조정은 'hive.merge.size.per.task', 'hive.merge.smallfiles.avgsize' 에서 처리 합니다. 이것들을 조합하여, 블록을 유효하게 쓸 수 있는 사이즈로 조정해서 파일을 출력할 수 있습니다.

 아래는 약간 실무와는 좀 별개의 수치이긴 합니다만, 실제 merge 전후에 어느 정도의 처리시간 차이가 나는지, count 문과 select 문 (where hoge=xxx)를 발행 해 본 비교 결과 입니다.

 

 

튜닝 결과 (데이터 구조 변경)
 이번에 측정한 시점에서는 모든 데이터 마이그레이션 아직 끝나지 않았기에 효과는 어중간 했지만, 그래도 1시간 정도 처리 속도가 개선 되었기 때문에, 위에서 얘기한 수법은 튜닝 방향으로서는 틀리지 않았다고 생각합니다.

 

 

서버 증설
 자, 주저리주저리 길게 썼는데요. 튜닝 작업에는 한계가 있어, 마지막에는 서버 증설 (돈)에 의지해야만 하는 경우도 있습니다.
 이번 같은 경우에도 본질적으로는 그런 상황이었기 때문에, 기존의 Hadoop Cluster의 DataNode/TaskTracker 대수를 2배로 늘렸습니다.

 

 

 

 단번에 6시간이나 단축...
 어쨌든 뭐랄까, 최종적으로는 돈이 중요하다는 현실에 직면하게 되었네요.

 실제로 데이터 구조 튜닝에 의해 잠재적으로 처리는 효율화 되었지만, 그래도 아직 남아있었던 용량 부족 문제는 서버 증설 작업으로 해결되었습니다. 아무것도 하지 않았던 때보다 처리속도 개선 효과가 컸다고 생각합니다.

 

정리
 어쨌든지 간에 다양한 수단을 구사하여 분석 시간을 9시간 정도 단축 할 수 있었습니다.

 

 

 마지막으로, 이번 튜닝 과정 중 느낀 점을 정리하는 측면에서 적고 이 글을 마치고자 합니다.

 

 

 

 이상으로, '캐쥬얼'한 규모의 데이터 클러스터에서의 분석 처리 튜닝 과정에 대해 적어 보았습니다.
 이 블로그 내용이 여러분에게 조금이라도 도움이 된다면 기쁠 것입니다.


Appendix.
 통계분석 기반에서 현재 다루고 있는 데이터는 기껏해야 PB 클래스의 '캐쥬얼'한 사이즈의 데이터입니다만, 이후 데이터 사이즈 증가를 고려 했을 땐 현재 상황 보다 효율적인 데이터 구조를 검토할 필요가 있습니다.
 또한, 현재는 거의 Daily 혹은 Hourly 분석이 중심이지만, 좀 더 빠른 데이터 분석을 하고 리얼타임에 가까운 형태로 분석 결과를 표시하는 구조도 필요하게 될겁니다.

 

 데이터 구조에 대해서는, 현재 개인적으로 관심을 가지고 있는 것이 'CIF(ColumnInputFormat)' 이라 불리는 것입니다. 데이터 로컬리티에 신경을 써서 같은 데이터 스키마를 동일한 DataNode로 나눠 할당하는 방법이나, 시리얼라이즈(serialize)나 데이터 압축 등을 고안해서 처리 고속화를 실현하는 수법이라 합니다. 
 아직 실제로 실험 해 보지는 않았지만, 논문 베이스로는 데이터를 읽어 들이는 속도가 RCFile보다 38배 빠르다고 합니다.
Column-Oriented Storage Techniques for MapReduce(VLDB ’11)
Sandeep’s Research Notes: RCFile and CIF


 또, 실 서비스 예로는, Treasure Data 에서는 내부 데이터 구조에 MessagePack을 이용하여 높은 압축률을 실현하고 있다고 합니다.
Five Criteria of Next Generation Data Warehouse | Treasure Data Blog

 Check. We achieve a 5-10x compression ratio. Columnar data storage helps with compression considerably, but our secret sauce is a binary serializer called MessagePack. MessagePack is space-efficient and incredibly fast to serialize and deserialize. One of our co-founders is the original author of MessagePack, and we use it extensively throughout our stack.

 

 이런 실사례를 참고하면서, 보다 효율적인 데이터 구조 채용을 검토 해 나가고 싶습니다.

 데이터의 준 리얼타임 분석에 관해서는 이른바 Streaming Processing의 개념과 이를 실현할 미들웨어 필요 해 집니다.
 가장 유명한 것으로는 Twitter 회사가 도입하고 있다는 'Storm' 일까요.
 또, 로그 콜렉터로 유명한 'Fluentd' 에관해서도, Aggregation을 실행하는 플러그인 등도 있고, 생각하기에 따라 Streaming Processing이 가능 해 질 거라 생각합니다.

 이것들도 여러가지로 실험 해 보고 싶습니다.


 잘나가는 서비스의 뒷면에는, 잘나가는 데이터 분석 기반이 존재한다는 것이 통설(?)이기에, 저희들도 가능한 그 경지에 가까이 다가갈 수 있도록 노력 해 나가고자 합니다.

 

 

 

Posted by 이슈타르네스