MAP REDUCE: Hints

Just in case you need to write something with MR Job

  • Hadoop Streaming example

      $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
          -D mapreduce.job.reduces=1 \
          -D my.java.option=3 \
          -input "/path/to/input/dir/"  \
          -output "/path/to/output/dir/" \  
          -mapper "mymapper.py"  \
          -reducer "myreducer.py"
  • Python mrjob on YARN example (put this to run.sh in the project)

    python3.6 mr_job.py \
          -r hadoop \
          --hadoop-streaming-jar=/usr/hdp/2.4.0.0-169/hadoop-mapreduce/hadoop-streaming.jar \
          --total-words=5 input1.txt input2.txt
          --python-bin=python3.6
    
    python3 scipt.py -r hadoop hdfs:///user/raj_ops/testHW1_1.txt --hadoop-streaming-jar=/usr/hdp/2.6.4.0-91/hadoop-mapreduce/hadoop-streaming.jar
    python3 mr_log.py -r local ../test_data/log_full.txt > answer.csv
  • Specifying number of reducers in streaming jar options

      -D mapreduce.job.reduces=1
  • Specifying key comparator in streaming jar options

      -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
      -D mapred.text.key.comparator.options=-nr
  • Passing streaming jar options via MRJob

      class MyMRJon(MRJob):
          JOBCONF = {
              'mapreduce.job.reduces': 1,
              'mapred.output.key.comparator.class':
                  'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
              'mapred.text.key.comparator.options': '-nr',    # compare keys numerically in reverse order
          }
    
          def mapper(...):
             ...
    
          def reducer(...):
             ...
  • Dynamic jobconf

      class MRLogs(MRJob):
          def jobconf(self):
              conf = super().jobconf()
              conf.update({
                  'opt1': 'value',
                  'opt2': 'value2',
              })
              return conf
  • Outputting CSV or SequenceFile

      class MRLogs(MRJob):
    
          OUTPUT_PROTOCOL = mrjob.protocol.RawValueProtocol   # Do not output keys, only values.
    
          def hadoop_output_format(self):
              if some_condition:
                  return 'org.apache.hadoop.mapred.SequenceFileOutputFormat' # Output key-values in SequenceFile
    
          def reducer(self, key, values):
              yield key, 'my,comma,separated,data'
  • Enable Snappy compression of job's output (Hadoop Streaming options)

    • -D mapred.output.compress=true

    • -D mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec

    • -D mapred.output.compression.type=BLOCK

  • Configuring options

      class MRLogs(MRJob):
          def configure_options(self):
              super().configure_options()
              self.add_passthrough_option(
                  '--myopt', choices=['a', 'b'], default='a',
                  help='My option'
              )
    
          def jobconf(self):
              if self.options.myopt:
                  do_smthng()
    
          def mapper(self, _, line):
              if self.options.myopt:
                  do_smthng()
    
          def reducer(self, key, values):
              if self.options.myopt:
                  do_smthng()
  • Using distributed cache with Hadoop Streaming

      $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
          -input "/path/to/input/dir/"  \
          -output "/path/to/output/dir/" \  
          -mapper "mymapper.py"  \
          -reducer "myreducer.py" \
          -file /path/to/my/file
  • Using distributed cache in mrjob

      class MRCityStats(MRJob):
          def configure_options(self):
              super().configure_options()
              self.add_file_option('--my-file')
              self.add_passthrough_option('--some-other-option')
    
          def mapper(self, _, line):
              with open(self.options.my_file) as f:    # File will be copied to all mapper/reducer current directories
                  do_smthng()
    
          def reducer(self, key, values):
              with open(self.options.my_file) as f:
                  do_smthng()

Last updated