MAP REDUCE: Hints
Just in case you need to write something with MR Job
Hadoop Streaming example
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -D mapreduce.job.reduces=1 \ -D my.java.option=3 \ -input "/path/to/input/dir/" \ -output "/path/to/output/dir/" \ -mapper "mymapper.py" \ -reducer "myreducer.py"
Python mrjob on YARN example (put this to
run.sh
in the project)python3.6 mr_job.py \ -r hadoop \ --hadoop-streaming-jar=/usr/hdp/2.4.0.0-169/hadoop-mapreduce/hadoop-streaming.jar \ --total-words=5 input1.txt input2.txt --python-bin=python3.6 python3 scipt.py -r hadoop hdfs:///user/raj_ops/testHW1_1.txt --hadoop-streaming-jar=/usr/hdp/2.6.4.0-91/hadoop-mapreduce/hadoop-streaming.jar python3 mr_log.py -r local ../test_data/log_full.txt > answer.csv
Specifying number of reducers in streaming jar options
-D mapreduce.job.reduces=1
Specifying key comparator in streaming jar options
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -D mapred.text.key.comparator.options=-nr
Passing streaming jar options via MRJob
class MyMRJon(MRJob): JOBCONF = { 'mapreduce.job.reduces': 1, 'mapred.output.key.comparator.class': 'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator', 'mapred.text.key.comparator.options': '-nr', # compare keys numerically in reverse order } def mapper(...): ... def reducer(...): ...
Dynamic jobconf
class MRLogs(MRJob): def jobconf(self): conf = super().jobconf() conf.update({ 'opt1': 'value', 'opt2': 'value2', }) return conf
Outputting CSV or SequenceFile
class MRLogs(MRJob): OUTPUT_PROTOCOL = mrjob.protocol.RawValueProtocol # Do not output keys, only values. def hadoop_output_format(self): if some_condition: return 'org.apache.hadoop.mapred.SequenceFileOutputFormat' # Output key-values in SequenceFile def reducer(self, key, values): yield key, 'my,comma,separated,data'
Enable Snappy compression of job's output (Hadoop Streaming options)
-D mapred.output.compress=true
-D mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec
-D mapred.output.compression.type=BLOCK
Configuring options
class MRLogs(MRJob): def configure_options(self): super().configure_options() self.add_passthrough_option( '--myopt', choices=['a', 'b'], default='a', help='My option' ) def jobconf(self): if self.options.myopt: do_smthng() def mapper(self, _, line): if self.options.myopt: do_smthng() def reducer(self, key, values): if self.options.myopt: do_smthng()
Using distributed cache with Hadoop Streaming
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input "/path/to/input/dir/" \ -output "/path/to/output/dir/" \ -mapper "mymapper.py" \ -reducer "myreducer.py" \ -file /path/to/my/file
Using distributed cache in mrjob
class MRCityStats(MRJob): def configure_options(self): super().configure_options() self.add_file_option('--my-file') self.add_passthrough_option('--some-other-option') def mapper(self, _, line): with open(self.options.my_file) as f: # File will be copied to all mapper/reducer current directories do_smthng() def reducer(self, key, values): with open(self.options.my_file) as f: do_smthng()
Last updated
Was this helpful?