Just in case you need to write something with MR Job
Hadoop Streaming example
Copy $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-D mapreduce.job.reduces=1 \
-D my.java.option=3 \
-input "/path/to/input/dir/" \
-output "/path/to/output/dir/" \
-mapper "mymapper.py" \
-reducer "myreducer.py"
Python mrjob on YARN example (put this to run.sh
in the project)
Copy python3.6 mr_job.py \
-r hadoop \
--hadoop-streaming-jar=/usr/hdp/2.4.0.0-169/hadoop-mapreduce/hadoop-streaming.jar \
--total-words=5 input1.txt input2.txt
--python-bin=python3.6
python3 scipt.py -r hadoop hdfs:///user/raj_ops/testHW1_1.txt --hadoop-streaming-jar=/usr/hdp/2.6.4.0-91/hadoop-mapreduce/hadoop-streaming.jar
python3 mr_log.py -r local ../test_data/log_full.txt > answer.csv
Specifying number of reducers in streaming jar options
Copy -D mapreduce.job.reduces=1
Specifying key comparator in streaming jar options
Copy -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
-D mapred.text.key.comparator.options=-nr
Passing streaming jar options via MRJob
Copy class MyMRJon(MRJob):
JOBCONF = {
'mapreduce.job.reduces': 1,
'mapred.output.key.comparator.class':
'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
'mapred.text.key.comparator.options': '-nr', # compare keys numerically in reverse order
}
def mapper(...):
...
def reducer(...):
...
Dynamic jobconf
Copy class MRLogs(MRJob):
def jobconf(self):
conf = super().jobconf()
conf.update({
'opt1': 'value',
'opt2': 'value2',
})
return conf
Outputting CSV or SequenceFile
Copy class MRLogs(MRJob):
OUTPUT_PROTOCOL = mrjob.protocol.RawValueProtocol # Do not output keys, only values.
def hadoop_output_format(self):
if some_condition:
return 'org.apache.hadoop.mapred.SequenceFileOutputFormat' # Output key-values in SequenceFile
def reducer(self, key, values):
yield key, 'my,comma,separated,data'
Enable Snappy compression of job's output (Hadoop Streaming options)
-D mapred.output.compress=true
-D mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec
-D mapred.output.compression.type=BLOCK
Configuring options
Copy class MRLogs(MRJob):
def configure_options(self):
super().configure_options()
self.add_passthrough_option(
'--myopt', choices=['a', 'b'], default='a',
help='My option'
)
def jobconf(self):
if self.options.myopt:
do_smthng()
def mapper(self, _, line):
if self.options.myopt:
do_smthng()
def reducer(self, key, values):
if self.options.myopt:
do_smthng()
Using distributed cache with Hadoop Streaming
Copy $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input "/path/to/input/dir/" \
-output "/path/to/output/dir/" \
-mapper "mymapper.py" \
-reducer "myreducer.py" \
-file /path/to/my/file
Using distributed cache in mrjob
Copy class MRCityStats(MRJob):
def configure_options(self):
super().configure_options()
self.add_file_option('--my-file')
self.add_passthrough_option('--some-other-option')
def mapper(self, _, line):
with open(self.options.my_file) as f: # File will be copied to all mapper/reducer current directories
do_smthng()
def reducer(self, key, values):
with open(self.options.my_file) as f:
do_smthng()