RecoPick



안녕하세요. RecoPick 팀의 김성민 입니다.

이번에는 Hadoop Job Counter 제한 초과 오류에 이어서 Hadoop map-reduce 작업 중 발생하는 OOM (Out of memory) 에러에 대해서 공유 드리고자 합니다. 


현상


syslog logs

2014-04-28 18:38:05,051 FATAL org.apache.hadoop.mapred.Task: attempt_201402170624_239030_r_000008_0 : Map output copy failure : java.lang.OutOfMemoryError: Java heap space
        at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.shuffleInMemory(ReduceTask.java:1640)
        at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.access$3000(ReduceTask.java:1197)
        at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.shuffle(ReduceTask.java:2364)
        at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.getMapOutput(ReduceTask.java:1511)
        at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:1360)
        at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:1292)



원인


Hadoop map-reduce 작업은 아래 그림과 같이 4단계로 이루어지는데, 


Figure 1. Shffle and sort in MapReduce



map 태스크의 결과가 reduce 태스크로 복사될 때, map의 결과 데이터 치수가 작을 경우, reduce 태스크의 JVM 메모리에 복사됩니다. 
그런데 reduce 태스크의 JVM 메모리가 작은 경우, 위와 같이 Out Of Memory Error가 발생합니다. 
reduce 태스크의 JVM 메모리로 복사될 수 있는 map 태스크의 데이터 치수는 mapred.job.shuffle.input.buffer.percent (default 값은 0.70) 설정으로 조정 가능합니다.  

해결 방법


reduce 태스크의 JVM 메모리에 맞게 mapred.job.shuffle.input.buffer.percent 값을 0.70 -> 0.20 으로 설정해서 map-reduce 작업을 수행하도록 하였습니다.

[참고 자료]

(1) 7 Deadly Hadoop Misconfigurations  by Kate Ting, Cloudera
http://archive.apachecon.com/na2013/presentations/27-Wednesday/Big_Data/14:45-7_Deadly_Hadoop_Misconfigurations-Kathleen_Ting/HadoopTroubleshootingApacheCon.pdf
http://files.meetup.com/12611842/03-27-14-Cloudera.pdf

(2) Hadoop Troubleshooting 101 by Kate Ting, Cloudera
http://www.slideshare.net/cloudera/hadoop-troubleshooting-101-kate-ting-cloudera

(3) Hadoop: The Definitive Guide, 3rd Edition 




Posted by recopick

안녕하세요. RecoPick팀 김성민입니다.

hadoop으로 map-reduce 작업을 수행할 때, 작은 크기의 파일들이 많을 경우, 어떤 문제점이 있고 이러한 문제점을 hadoop에서 어떻게 해결할 수 있는지 간략하게 정리해 봤습니다.

일반적으로 로그 데이터를 처리하는 경우, 대부분 시간 단위로 저장되는 데이터이기 때문에, 작은 크기의 로그 파일들이 많이 생겨납니다. 이러한 로그 데이터처럼, 크기는 작지만, 파일 개수가 많은 데이터를 map-reduce 작업으로 처리하고자 할 경우, map task는 최소 파일 개수 만큼 생성됩니다. 


원래 hadoop에서는 입력 데이터를 일정한 크기(기본은 HDFS block 크기인 64MB)로 나눠서 input split (이하 block이라고 함)을 생성한 다음, map task를 실행합니다. 하지만 하나의 파일의 크기가 block 크기보다 작더라도, 다른 파일의 데이터를 읽어서 나머지 block 크기만큼의 데이터를 채워서 하나의 map task에서 처리 하도록 하지는 않습니다. 크기가 작은 파일이더라도, 파일 하나당 하나의 map task가 생성되고, map task들은 적은 입력 데이터만 처리하고 종료하게 됩니다. 

 
예를 들어, 하나의 서버에서 동시에 2개의 map task처리할 수 있고, 이러한 서버 다섯 대로 구성된 hadoop 클러스터가 있다고 가정하면, 한 번에 실행될 수 있는 최대 map task 수는 (장비 당 최대 실행 가능한 map task 수) x (Task Node 수) = 2 * 5 = 10 개입니다. 그런데 10분 간격으로 로그 데이터( < 64MB 작은 파일들)를 저장할 경우, 1시간에 6개, 1일이면, 144개의 로그 파일들이 생성됩니다.


앞서 가정한 hadoop 클러스터에서 1일 동안의 로그 분석을 하고자 한다면, 결국 최소 144개의 map task가 필요합니다. 결국, 1일 데이터를 처리하는 동안 사용 가능한 최대 map task slot을 모두 차지하게 되므로, 다른 map-reduce 작업을 스케줄링하는데 어려움이 생기게 됩니다.


그리고 map task가 한번 실행되고, 종료 될 때 마다, map task 당 jvm이 한 번씩 실행되고, 종료되기 때문에 jvm이 실행되고 종료되는 시간이 전체 작업 시간을 지체시킬 수 있습니다. (물론, jvm을 재사용 옵션 mapred.job.reuse.jvm.num.tasks을 사용하면, 이런 문제는 피할 수 있습니다.)


또한, hadoop의 HDFS에 저장되는 모든 파일의 메타 정보들은 name node의 메모리에 저장되는데, 파일 개수가 많아지면, name node의 메모리를 더 많이 사용하게 되고, disk 공간은 남아 있음에도 불구하고, name node의 메모리가 부족해서 더 이상 파일을 저장할 수 있는 상황도 발생할 수 있습니다.

그렇다면, 이런 작은 크기의 파일들을 어떻게 하면 효과적으로 hadoop에서 처리 할 수 있을까요?

가장 쉽게 떠오르는 아이디어는 크기가 작은 파일들이 여러 개 존재하는 것이 문제이므로, 여러 개의 작은 파일들을 최소한 block 크기보다 크게 합쳐서 저장하는 방법을 생각할 수 있습니다. 


조금 더 구체적으로 적어보면,

  1. 처음부터 HDFS에 데이터를 기록할 때, 하나의 파일에 계속 append 하는 방식으로 작은 크기의 파일을 하나로 합치는 방법
  2. 주기적으로 여러 개의 작은 파일들을 하나로 합치는 별도의 작업을 실행해서 하나의 큰 파일로 만드는 방법
  3. hadoop archive 파일 (har)을 이용하는 방법
등을 떠올릴 수 있습니다.


그런데 로그 데이터와 같은 시간 단위 데이터를 저장하기 할 때,


  • clicklog.2013-12-05_0700
  • clicklog.2013-12-05_0800
  • clicklog.2013-12-05_0900
  • clicklog.2013-12-05_1000
  • clicklog.2013-12-05_1100
  • clicklog.2013-12-05_1200


위 와 같이 한 시간 단위로 나눠서 저장하는 대신, 


  • clicklog.2013-12-06
  • clicklog.2013-12-07
  • clicklog.2013-12-08
  • clicklog.2013-12-09
  • clicklog.2013-12-10


위와 같이 대용량의 파일(하루 단위)로 모아서 저장할 경우, 일정 기간(12/5 7시~10시까지)의 데이터만 필요한 경우에는 데이터를 다시 분리를 해야 하기 때문에 불편할 수 있습니다.


이런 경우를 대비해서 파일 이름(시간 및 날짜 정보)을 키로 하고, 파일 내용 전체를 값으로 해서 sequence file 혹은 map file 형태로 저장하면, 여러 개의 작은 파일들을 하나의 큰 파일로 저장할 수 있으면서도, 필요한 만큼의 데이터를 쉽게 찾을 수 있습니다. 특히, sequence file을 사용할 경우, block 단위로 압축할 수 있기 때문에, 더욱 많은 작은 파일들을 효과적으로 하나의 큰 파일로 만들 수 있습니다.

지금까지는 작은 파일들을 하나의 큰 파일로 합치는 방법을 생각해 보았습니다. 그런데 작은 파일들을 하나의 큰 파일로 합치기 위해서 별도의 작업을 하는 것이 번거로울 때가 있습니다. 특히, 일회성 데이터를 분석을 위해 이런 방법을 써야 한다면, 데이터 분석 작업 보다 데이터를 준비하는 작업이 더 오래 걸릴 수 도 있습니다.

그렇다면, 작은 파일들을 합치지 않고, map-reduce 작업을 할 때, 불필요하게 많은 map task가 수행되지 않는 방법은 없을까요?


hadoop의 map-reduce 작업 수행 시, 여러 개의 파일이 하나의 map task의 입력으로 사용되지 않기 때문에, 최소한 파일 개수만큼 map task 작업이 수행됩니다. 반대로 생각하면, 만약, 단일 map task의 입력 데이터로 여러 개의 파일을 사용할 수 있다면, 총 파일 개수보다 더 작은 수의 map task로 map-reduce 작업을 수행할 수 있게 됩니다.


hadoop의 map task는 기본적으로 TextInputFormat이라는 클래스를 이용해서 입력 데이터를 읽어오는데, 여러 개의 파일을 하나의 입력으로 읽어올 수 있는 InputFormat 클래스가 있다면, 작은 파일들을 하나로 합치지 않고도 적정한 개수의 map task 수를 유지하면서 map-reduce 작업을 수행 할 수 있지 않을까요?

이런 요구 사항에 딱 맞는 CombineFileInputFormat이라는 클래스가 있습니다. 하지만 CombineFileInputFormat 클래스는 추상 클래스이므로, 자신의 상황에 맞게 직접 구현해서 사용해야 합니다. 최근에 Process Small Files on Hadoop Using CombineFileInputFormat 이라는 블로그 포스트에서 CombineFileInputFormat 클래스에 대한 구현 예제와 성능 실험에 대한 내용이 공유되었으니, 구현 시 참고하시면 좋을 것 같습니다.


CombineFileInputFormat 클래스는 사용하면, 너무 많은 map task가 수행되는 문제점은 해결 할 수 있지만, name node 메모리 문제는 여전히 남아 있습니다. 


앞서 설명한 네 가지 방법들을 사용하면, 두 가지 문제를 모두 어느 정도까지는  해결 할 수 있지만, 여전히 작은 파일들을 하나로 합치기 위한 별도의 작업이 필요하다는 점에서는 조금 귀찮은 부분이 있습니다.

너무 많은 개수의 map task 수와 name node 메모리 문제를 모두 해결하기 위해서 저희 팀에서 선택한 방법은 데이터를 hbase에 저장하는 방식입니다.

hbase는 처음부터 HDFS의 기본  block 크기(64MB)보다 더 작은 크기의 데이터를 저장하기 위해서 설계되었으므로, 작은 파일들을 효율적으로 저장할 수 있습니다. 따라서 작은 파일들을 저장하기 위해서 하나의 큰 파일로 합치는 별도의 작업을 할 필요가 없습니다.


또한, 로그 데이터처럼 시간 단위로 데이터를 저장해야 할 경우, hbase의 row key를 시간으로 설정하여 저장하면, 필요한 범위의 시간 데이터를 쉽게 가져올 수 있기 때문에, 파일로 저장했을 때보다 데이터 접근이 조금 더 편리 해 집니다.

hbase 데이터로 map-reduce 작업을 수행할 때, map task 수는 입력 데이터를 가지고 있는 region 개수만큼 실행됩니다. 그래서 region 개수를 작게 하면, 불필요하게 많은 map task 생성되는 것을 피할 수 있습니다. region 개수는 데이터 크기 즉, 저장되는 row 수와 비례하지만, hbase 테이블 설계나 설정을 통해서 적절한 개수를 유지할 수 있기 때문에 HDFS에서 매번 작은 파일들을 하나로 합치는 작업을 하는 것보다는 관리가 쉽습니다.

지금까지 얘기한 작은 크기의 파일들을 하둡에서 효과적으로 처리하는 방법을 요약하자면, 다음과 같습니다.


  1. hdfs에 저장할 때, 파일 내용을 append하는 방법
  2. 작은 파일들을 하나로 합치는 작업을 주기적으로 실행하는 방법
  3. hadoop archive (har) 파일을 이용하는 방법
  4. 파일 이름과 파일 내용을 각각 키와 값으로 해서 sequence file로 저장하는 방법
  5. CombineFileInputFormat을 이용해서 맵 리듀스 작업을 수행하는 방법
  6. HBase에 데이터를 저장하는 방법



[참고 자료]

The Small Files Problem


File Appends in HDFS

Hadoop Archive: File Compaction for HDFS

Process Small Files on Hadoop Using CombineFileInputFormat


Hadoop I/O: Sequence, Map, Set, Array, BloomMap Files

Hadoop: The Definitive Guide (3rd Edition) Ch 7. MapReduce Types and Formats - Processing a whole file as a record


filecrush - a highly configurable tool by Edward Capriolosmall files on HDFS


Dealing with hadoop's small files problem












Posted by recopick