Python boto 로 AWS EMR 사용하기
www.recopick.com

안녕하세요. RecoPick 김태현입니다.
이번 글에서는 Python boto로 Amazon Elastic MapReduce(EMR)  사용하는 방법에 대해서 간단한 예제와 함께 살펴보겠습니다.

RecoPick 에서는 대용량 추천 계산을 위해 EMR을 사용하고 있습니다. EMR은 Hadoop Framework 을 쉽게 사용할 수 있도록 AWS에서 제공해주는 서비스입니다.
EMR에서는 실제 장비를 사용한 시간만큼만 요금이 부과되기 때문에 24시간 Hadoop Platform을 사용하지 않는다면, 비용절감을 위해 EMR 적용을 추천해드립니다.

오늘은, EMR을 구동하기 위한 다양한 방법이 있겠지만, 그중 하나인 Python boto를 이용하는 방법에 대해서 간단한 예제와 함께 설명해 드리려고 합니다.
boto는 AWS 서비스를 python에서 쉽게 사용할 수 있도록 제공되는 python package 입니다. 설치 및 관련 API는 아래 참고를 확인하세요.

1. EMR을 사용하기 위해 관련된 package를 import 합니다.


  import boto.emr

  

  from boto.emr.step import JarStep

  from boto.emr import BootstrapAction


2. EMR을 사용하기 위한 connection을 맺습니다.


  conn = boto.emr.connect_to_region(

      aws_access_key_id='YOUR_KEY_ID',

      aws_secret_access_key='YOUR_ACCESS_KEY',

      region_name='YOUR_REGION_NAME')


3. 별도의 Hadoop 설정이 필요하다면 필요한 설정들을 list로 만듭니다.


  hadoop_config_params = ['-m', 'mapred.child.java.opts=-Xmx2g',

                          '-m', 'mapred.reduce.memory.mb=3072',

                          '-m', 'mapred.job.reuse.jvm.num.tasks=-1',

                          '-m', 'mapred.map.tasks.speculative.execution=false',

                          '-m', 'mapred.reduce.tasks.speculative.execution=false',

                          '-h', 'dfs.replication=2']


4. Bootstrap Action을 사용하여 Hadoop 설정을 진행하고, ganglia 모니터링이 필요하다면 이것 역시 Bootstrap Action을 통해 install 합니다.


  hadoop_config_bootstrapper = BootstrapAction('hadoop-config',

                                               's3://elasticmapreduce/bootstrap-actions/configure-hadoop',

                                               hadoop_config_params)

  hadoop_monitor_bootstrapper = BootstrapAction('ganglia-config',

                                                's3://elasticmapreduce/bootstrap-actions/install-ganglia',

                                                '')


5. 실제 프로그램에서 사용할 main class와 관련 args를 list로 설정하고


  step_args = ['com.recopick.MainClassPath',

               '--arg1',

               'arg1',

               '--arg2',

               'arg2']


6. Hadoop 작업에 사용될 jar 파일의 s3 위치를 지정합니다.


  jar_path = 's3n://jar_path'

 
7. 위에서 만들어진 step_args와 jar_path를 이용하여 Hadoop job 하나를 step으로 생성합니다. 추가 step이 필요하다면 이 과정을 반복하면 됩니다.


  job_steps = JarStep(name='my-steps',

                      jar=jar_path,

                      step_args=step_args)


8. 생성된 step list를 argument로 EMR을 실행합니다. 이때 Hadoop version 및 ec2 장비 type과 장비 수를 설정할 수 있습니다. 


  job_id = conn.run_jobflow(name='job_name',

                            ami_version='3.0.4',

                            ec2_keyname='keyname',

                            availability_zone='zone',

                            master_instance_type='master_ec2_type',

                            slave_instance_type='slave_ec2_type',

                            num_instances='ec2_num',

                            log_uri='s3://recopick-snoop/log/snoop-emr/',

                            bootstrap_actions=[hadoop_config_bootstrapper],

                            steps=[job_steps])

 
9. ec2 장비에 tag를 할당해서 다른 ec2 장비와 용도를 구분할 수 있고, ‘costCenter’ tag를 이용하면 각 EMR 작업마다 요금을 구분할 수도 있습니다.


  tag_dict = {}

  tag_dict['Name'] = 'emr.' + job_id

  conn.add_tags(job_id, tag_dict)


참고로 ec2 장비 type에 따라 네트워크 속도가 다르므로, 빠른 처리속도를 원한다면 높은 사양의 ec2를 사용하는 것을 추천해드립니다.
RecoPick 에서는 process 수를 같게 맞췄을 때 m1.medium보다 c1.xlarge를 사용했을 때 약 20% 정도 처리속도가 빨라진 것을 경험할 수 있었습니다.
 
아래는 전체 sample script입니다.


  import boto.emr

 

  from boto.emr.step import JarStep

  from boto.emr import BootstrapAction

 

  conn = boto.emr.connect_to_region(

      aws_access_key_id='YOUR_KEY_ID',

      aws_secret_access_key='YOUR_ACCESS_KEY',

      region_name='YOUR_REGION_NAME')

 

  hadoop_config_params = ['-m', 'mapred.child.java.opts=-Xmx2g',

                          '-m', 'mapred.reduce.memory.mb=3072',

                          '-m', 'mapred.job.reuse.jvm.num.tasks=-1',

                          '-m', 'mapred.map.tasks.speculative.execution=false',

                          '-m', 'mapred.reduce.tasks.speculative.execution=false',

                          '-h', 'dfs.replication=2']

 

  hadoop_config_bootstrapper = BootstrapAction('hadoop-config',

                                               's3://elasticmapreduce/bootstrap-actions/configure-hadoop',

                                               hadoop_config_params)

  hadoop_monitor_bootstrapper = BootstrapAction('ganglia-config',

                                                's3://elasticmapreduce/bootstrap-actions/install-ganglia',

                                                '')

 

  step_args = ['com.recopick.MainClassPath',

               '--arg1',

               'arg1',

               '--arg2',

               'arg2']

 

  jar_path = 's3n://jar_path'

 

  job_steps = JarStep(name='my-steps',

                      jar=jar_path,

                      step_args=step_args)

 

  job_id = conn.run_jobflow(name='job_name',

                            ami_version='3.0.4',

                            ec2_keyname='keyname',

                            availability_zone='zone',

                            master_instance_type='master_ec2_type',

                            slave_instance_type='slave_ec2_type',

                            num_instances='ec2_num',

                            log_uri='s3://recopick-snoop/log/snoop-emr/',

                            bootstrap_actions=[hadoop_config_bootstrapper, hadoop_monitor_bootstrapper],

                            steps=[job_steps])

 

  tag_dict = {}

  tag_dict['Name'] = 'emr.' + job_id

  conn.add_tags(job_id, tag_dict)


참고
1. https://aws.amazon.com/ko/elasticmapreduce/
2. https://github.com/boto/boto
3. http://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/useprogramaccess.html


Posted by recopick

댓글을 달아 주세요