Storm 을 이용한 실시간 데이터 처리
www.recopick.com

안녕하세요. RecoPick팀의 김태현입니다.

일반적으로 대용량 데이터 처리를 위해 batch 방식의 Hadoop을 많이 사용하게 되는데, 오늘은 RecoPick 에서 실시간 데이터 처리를 위해 사용 중인 Storm을 간략하게 소개하고 몇 가지 팁들을 공유 해 드리겠습니다.

Storm
StormBackType에서 개발됐고, 이후 BackType이 Twitter에 인수되면서 실시간 데이터 분석용으로 사용되고 있는 분산처리시스템입니다


Storm 특징
 - 실시간 처리 (Realtime processing)
 - 로컬 및 분산 모드 지원
 - 확장성 (Scale out)
 - 장애허용 (Fault tolerance)
 - 뛰어난 성능
 - 다양한 프로그래밍 언어 지원 (Java, Python, Scala, Ruby, Closure)

Storm Cluster
Strom Cluster는 Nimbus, Supervisor, Zookeeper 3가지로 구성되어 있으며, 관리 도구인 UI를 포함하고 있습니다.

  • Nimbus: Master 노드로 수행할 코드를 Cluster에 배포, Supervisor에 작업을 할당하고 관리하며 장애관리 등의 역할을 수행합니다.  (Hadoop의 job tracker와 비슷)
  • Supervisor: Worker process로 작업을 할당받아 처리하는 역할을 수행합니다. (Hadoop의 task tracker와 비슷)
  • Zookeeper: Storm Cluster를 관리합니다.


Topology
Topology는 Stream의 처리 흐름을 정의하는 그래프로 표현되며, Stream, Spout과 Bolt로 구성되어 있습니다.
 - Stream: Tuple들의 시퀀스
 - Spout: Stream의 소스
 - Bolt: Input Stream을 가공해서 새로운 Output Stream으로 변환 또는 처리
 


Stream Grouping
Stream Grouping은 stream의 Tuple을 Bolt에게 전달하는 방법을 나타냅니다.
 - Shuffle Grouping: Tuple을 random 하게 Bolt에 할당 
 - Field Grouping: Tuple에 있는 필드 값을 기준으로 파티셔닝 한 후 각 Bolt에 할당
 - All Grouping: Tuple을 모든 Bolt에 할당
 - Global Grouping: Tuple을 하나의 Bolt에 할당

RecoPick 에서는
RecoPick 에서는 아래 ETL 단계에서 서비스별 로그를 파싱하고 분석하여 NoSQL(HBase)에 저장하는 용도로 Storm을 사용 중이며, 각종 통계 정보를 실시간으로 확인하기 위한 역할을 수행하고 있습니다.

추가로, API Server에서 메시지를 전달받기 위해 Kafka를 메시지 큐로 사용하고 있으며 Storm에서는 Kafka 연동을 위해 kafkaspout(Kafka consumer를 사용해 Spout을 직접 구현해도 됨) 을 사용하고 있습니다.
메시지 큐로 Kafka를 선택한 이유는 Storm과 호환 할 수 있고, community 활동이 가장 활발하게 진행되고 있는 곳 중 하나이며,  LinkedIn에서 이미 사용 중인 신뢰할 수 있는 reference를 가지고 있기 때문입니다. Kafka와 함께 검토한 다른 solution에 관한 정보는 아래 표를 참고하시면 됩니다.



현재 RecoPick에서는 AWS의 m1.medium instance 네 대에 Storm을 설치하여 초당 500개 이상의 데이터를 처리하고 있으며, 앞으로 서비스가 늘어나 데이터가 증가하더라도, 쉽게 scale out이 가능하도록 시스템이 구성되어 있습니다.  

Tips 

  1. Storm의 topology.max.spout.pending 값(default 1)을 시스템에 맞게 잘 조정해야 합니다. 너무 큰 값으로 설정하면 stream queue가 overflow 될 수 있고, 반대로 너무 작은 값으로 설정하면 원하는 성능이 나오지 않을 수 있으니 운영하는 환경에 맞게 많은 실험을 거쳐 적절한 값을 찾아내야 합니다.
  2. Acker 수는 Spout 수와 같게 설정합니다. Acker 자체가 load가 많이 걸리는 작업은 아니지만 부족할 경우 ack 처리가 밀리게 되어 성능 저하의 원인이 될 수 있습니다.
  3. Storm과 Kafka를 같이 사용할 경우 병렬 처리를 위해 Spout 수를 조정하게 되는데, 이때 Kafka의 partition 수도 같이 조정해줘야 합니다. Kafka의 partition 변경 없이 Spout 수만 변경할 경우 partition 수만큼의 Spout만 동작하게 되고 나머지 Spout은 동작하지 않습니다.
  4. 마지막으로, zookeeper timeout 문제를 방지하기 위해 storm.zookeeper.connection.timeout 을 적절한 값으로 설정합니다.


마치며
RecoPick 에서는 앞으로 위 실시간 시스템에 CEP(Complex Event Processing)를 도입하여 RecoPick을 사용하는 고객에게 좀 더 많은 실시간 정보를 제공하는 플랫폼으로 발전시켜 나갈 계획입니다. (예를 들어 쇼핑몰에서 특정 기획 및 광고 마케팅에 따른 고객 피드백을 실시간으로 확인할 수 있다면, 해당 고객에게 많은 도움이 될 것으로 생각합니다.)

참고자료
 - https://github.com/nathanmarz/storm/wiki/Tutorial
 - https://blog.twitter.com/2011/storm-coming-more-details-and-plans-release
 - https://kafka.apache.org/documentation.html#gettingStarted
 - https://wikitech.wikimedia.org/wiki/Analytics/Kraken/Logging_Solutions_Overview 
 - https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka 



Posted by recopick

댓글을 달아 주세요