Supplementary Material - Using the Streaming API with Python

EXC 2019: Antonios Katsarakis, Chris Vasiladiotis, Ustiugov Dmitrii, Volker Seeker, Pramod Bhatotia

Based on previous material from Matt Pugh, Artemiy Margaritov, Michail Basios, Sasa Petrovic, Stratis Viglas & Kenneth Heafield

Introduction

The purpose of working through this document is to make sure you understand how to launch streaming jobs with Hadoop, and to ensure you understand the basic Python syntax required to undertake the assignment. The majority of the following commands must be entered in a terminal. Required reading:

Creating a Word-Counting Program with Python 2.7

In this subsection, we will write a mapper and a reducer that can count the number of words of movie titles. Initially, we will test the code locally on small files before using it in a streaming MapReduce job – it’s much easier to debug any coding errors on your personal computer than on the cluster.

Mapper

Create a file somewhere in your home directory called mapper.py – there are a number of Python IDEs available, including PyCharm on DICE machines. Copy the code of the mapper provided in the Designing a Solution using Hadoop Streaming in the into mapper.py and save. It’s worth typing this out by hand rather than copy / pasting, to better understand what the code is doing. Make sure you save the file mapper.py.

Reducer

Create a file called reducer.py in the same directory as mapper.py, and copy the code of the memory-efficient reducer into it. Once again, save this file before continuing.

Local Testing

We perform local testing conforming to typical UNIX-style piping, our testing will take the form:

cat <data> | map | sort | reduce

This emulates the same pipeline (in a non-distributed manner) that Hadoop will perform when streaming in a distributed way. You have to make sure that files mapper.py and reducer.py have execution permissions:

chmod u+x mapper.py
chmod u+x reducer.py

Try the following command and explain the results (hint: type man sort in your terminal window to find out more about the sort command):

cat title.basics.tsv | ./mapper.py | sort -t'|' -k1,1 | ./reducer.py

The title.basics.tsv can be found on HDFS under the directory /data/supplementary/.

Sanity Check

The output from the above code should result in the following output:

and|1
Are|1
Betrayed|1
Coming|1
Darren|1
Day|1
Don|1
Eleven|1
Films|1
King|1
Mafia|1
Man|1
McGannon|1
of|3
One|1
Out!|1
Sell|1
Student|1
Swanson)|1
the|2
The|3
(The|1
to|1
Town!|1
Valley|1
Who|1
Works|1
Years|1
Zombies|1

TASK: Count the number of words after editing the primaryTitle field in the title.basics.tsv file.

Running a Streaming MapReduce Job

After successfully running the code locally, the next step is to run it on the Hadoop cluster. You have your mapper, mapper.py, and your reducer, reducer.py, some input and locations in HDFS <input> and <output> respectively. We always have to specify /opt/hadoop/hadoop-2.9.2/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar as the jar to run (Hadoop is written in Java and requires this). The mapper and reducer we use are specified through -mapper and -reducer options. We also have to package our mapper and reducer files in the job submission:

hadoop jar /opt/hadoop/hadoop-2.9.2/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \
 -input <input> \
 -output <output> \
 -mapper mapper.py \
 -reducer reducer.py \
 -file mapper.py \
 -file reducer.py

Here, the -file option specifies a file to be packaged with the job. We repeat this flag for as many files as we need to package. This can be very useful for also packaging any auxiliary files your program might use (dictionaries, configuration files, etc).

Running a Streaming MapReduce Job with a Combiner

Recall that a combiner can be used to accelerate a MapReduce job. For the word counting task, a combiner can be identical to the reducer. (Note that a combiner can be different from a reducer for a different task). Consequently, copy the file reducer.py to combiner.py. The combiner can be added to a MapReduce job with -combiner option. We also should package the combiner in the job submission with -file. The full command to launch the MapReduce job is:

hadoop jar /opt/hadoop/hadoop-2.9.2/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \
 -input <input> \
 -output <output> \
 -mapper mapper.py \
 -combiner combiner.py \
 -reducer reducer.py \
 -file mapper.py \
 -file combiner.py \
 -file reducer.py

Multiple Input Files

In the above, the <input> provided to -input can be a single file, or a directory. If the latter, all files contained within will be sent to mappers. If the chosen directory has N files, then N map tasks will be scheduled. For large values of N, this will slow down the execution of a job.

If we needed to have more than one input file selectively, say title.basics.tsv and title.crew.tsv we could include these with multiple -input commands in the job specification as we did with -file:

hadoop jar /opt/hadoop/hadoop-2.9.2/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \
 -input title.basics.tsv \
 -input title.crew.tsv \
 ...

We would then need to add additional logic to our mapper, so that it could differentiate between the two as it won’t know from which file the line it is currently processing originates. We know that title.basics.tsv has exactly 9 data attributes and that name.basics.tsv has exactly 6 data attributes. Differentiating between the two is simple:

#!/usr/bin/python2.7
import sys

for line in sys.stdin:
    parts = line.strip().split()    # We've now got eith a list of 9 or 3 elements
    
    if len(parts) == 6:       # We're processing a line from name.basics.tsv
      nconst = parts[0]
      knownForTitles = parts[5]
      ...process fields...
    else:             # We're processing a line from title.basics.tsv
      tconst = parts[0]
      primaryTitle = parts[2]
      ...process fields...

How you process these to differentiate between the two in your reducer stage is down to your design. If you were looking to join the records on knownForTitles then you would need to develop a way to encode the data in such a way that records from name.basics.tsv and title.basics.tsv with the same tconst arrive at the same reducer.

Single Output File

Each reducer in a MapReduce job outputs its result in a separate file. As a result, the total result of a MapReduce job is spread across multiple files. The number of output files is equal to the number of the reducers. Distributing the total result across multiple files can be a problem if a task requires (1) to find a global metric (e.g., minimum or maximum) or (3) to output the result in the sorted order. To solve this problem, we can limit the number of reducers to 1. Thus, there will be only one instance of a reducer in the MapReduce job. This instance gets all the output of the map stage and creates only one output file. To limit the number of reducers, we need to add the -D mapred.reduce.tasks=1 option to the command. The full command to launch the MapReduce job is:

hadoop jar /opt/hadoop/hadoop-2.9.2/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \
 -D mapred.reduce.tasks=1 \
 -input <input> \
 -output <output> \
 -mapper mapper.py \
 -reducer reducer.py \
 -file mapper.py \
 -file reducer.py

NOTE: The order of the arguments is significant. Arguments using -D are generics and must come before command arguments such as -input etc.

NOTE A single reducer receiving huge amounts of data is inefficient. Thus, you should use many reducers instead of one where applicable. For situations where a single reducer is necessary, more than one map-reduce rounds can alleviate the efficiency problem by decreasing the amount of data a single reducer has to process.

NOTE: For efficiency you should use many reducers instead of one where applicable, since a single reducer may be a bottleneck point.

TASK: Run the example of the wordcount problem using only one reducer. Observe the output folder and the number of files produced.

Setting Job Configuration

Various job options can be specified on the command line, we will cover the most used ones in this section. The general syntax for specifying additional configuration variables is -D <name>=<value>

To avoid having your job named something like streamjob5025479419610622742.jar, you can specify an alternative name through the mapred.job.name variable:

-D mapred.job.name="My job"

TASK: Run the word counting example again, this time naming your job "Word counting <matriculation_number>", where <matriculation_number> is your matriculation number.

After you run the job (and preferably before it finishes), open the browser and go to the Hadoop web interface. In the list of running jobs look for the job with the name you gave it and click it. You can see various statistics about your job – try to find the number of reducers used. How many reducers did you use? If your job finished before you had a chance to open the browser, it will be in the list of finished jobs, not the list of running jobs, but you can still see all the same information by clicking on it.

Secondary Sorting

As was mentioned earlier, the key/value pairs are obtained by splitting the mapper output on the first tab character in the line. This can be changed using stream.map.output.field.separator and stream.num.map.output.key.fields variables. For example, if the key needs to be everything up to the second - character in the line, we would add the following:

-D stream.map.output.field.separator=- \
-D stream.num.map.output.key.fields=2

Hadoop also comes with a partitioner class org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner which is useful for cases where you want to perform a secondary sort on the keys. Imagine you have the following list of IPs:

192.168.2.1
190.191.34.38
161.53.72.111
192.168.1.1
161.53.72.23

You want to partition the data so that addresses with the first 16 bits are processed by the same reducer. However, you also want each reducer to see the data sorted according to the first 24 bits of the address. Using the mentioned partitioner class you can tell Hadoop how to group the data to be processed by the reducers. You do this using the following options:

-D mapreduce.map.output.key.field.separator=.
-D num.key.fields.for.partition=2

The first option tells Hadoop what character to use as a separator (just like in the previous example), and the second one tells how many fields from the key to use for partitioning. Knowing this, here is how we would solve the IP address example:

hadoop jar /opt/hadoop/hadoop-2.9.2/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \
 -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
 -D mapreduce.map.output.key.field.separator=. \
 -D stream.map.output.field.separator=. \
 -D stream.num.map.output.key.fields=3 \
 -D num.key.fields.for.partition=2 \
-input <input> \
-output <output> \
-mapper cat \
-reducer cat \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

The line with -D num.key.fields.for.partition=2 tells Hadoop to partition IPs based on the first 16 bits (first two numbers), and -D stream.num.map.output.key.fields=3 tells it to sort the IPs according to everything before the third separator (the dot in this case) – this corresponds to the first 24 bits of the address.

TASK: Copy the file /data/supplementary/secondary.txt to your input directory. Lines in this file have the following format:

last_name|first_name|address|phone_number

Note that the '|' (pipe) character is used as a field separator.

Using what you have learned in this section, your task is to:

  1. Partition the data so that all people with the same last name go to the same reducer.
  2. Partition the data so that all people with the same last name go to the same reducer, and also make sure that the lines are sorted according to first name.

Partitioning the Data, and Secondary Sorting

We want to partition the data so that all the people with the same first and last name go to the same reducer, and that they are sorted according to address. An implementation is given:

hadoop jar /opt/hadoop/hadoop-2.9.2/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \
 -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
 -D mapreduce.map.output.key.field.separator="|" \
 -D mapreduce.output.textoutputformat.separator="|" \
 -D stream.map.output.field.separator="|" \
 -D stream.num.map.output.key.fields=3 \
 -D num.key.fields.for.partition=2 \
 -D mapreduce.partition.keypartitioner.options=-k1,2 \
 -D mapreduce.partition.keycomparator.options=-k3 \
-input /data/supplementary/secondary.txt \
-output /user/$USER/output \
-mapper cat \
-reducer cat \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

hdfs dfs -cat /user/$USER/output/* returns the expected output as follows:

Stanley|Cup|Elm street 1|555-1002
Stanley|Paul|Elm street 3|555-1002
Simmons|Gene|Elm street 1|555-1000
Singer|Eric|Elm street 2|555-1001
Thayer|Tommy|Elm street 4|555-1003
Simmons|Gene|Elm street 5|555-666

The key points are the configurations of the KeyFieldBasedComparator class and the KeyFieldBasedPartitioner class. The documentation provides further examples of how you may use these features.

Note that multiple options mean that they should be enclosed in quotation marks, unlike the previous example of just -k3. If we wanted to sort by the first column in descending order, and both the second column, and then the third column, in ascending order:

-D mapreduce.partition.keypartitioner.options=-k1,2 \
-D mapreduce.partition.keycomparator.options="-k1,1r -k2,2 -k3,3"

This is called a composite key, and yields the result:

Stanley|Cup|Elm street 1|555-1002
Stanley|Paul|Elm street 3|555-1002
Thayer|Tommy|Elm street 4|555-1003
Singer|Eric|Elm street 2|555-1001
Simmons|Gene|Elm street 1|555-1000
Simmons|Gene|Elm street 5|555-666

Note that the results will be partitioned to multiple reducers and that the comparator sorting options get applied after the partioning has taken place.

If your keys are numeric, you need to use the -n modifier. Imagine the first key is a name (str), and the second is an age (int), to sort both in descending order (note the -n and -r become nr after the index notation):

-D mapreduce.partition.keycomparator.options="-k1,1r -k2,2nr"

Managing Hadoop Jobs

Listing Jobs

Running the following command will list all (completed and running) jobs on the cluster:

mapred job -list all

This will result in a table like this being displayed:

JobId                 State     StartTime     UserName Priority SchedulingInfo
  job_201009151640_0001 2     1285081662441 s0894589 NORMAL   NA
  job_201009151640_0002 2     1285081976156 s0894589 NORMAL   NA
  job_201009151640_0003 2     1285082017461 s0894589 NORMAL   NA
  job_201009151640_0004 2     1285082159071 s0894589 NORMAL   NA
  job_201009151640_0005 2     1285082189917 s0894589 NORMAL   NA
  job_201009151640_0006 2     1285082275965 s0894589 NORMAL   NA
  job_201009151640_0009 2     1285083343068 s0894589 NORMAL   NA
  job_201009151640_0010 3     1285106676902 s0894589 NORMAL   NA
  job_201009151640_0012 3     1285106959588 s0894589 NORMAL   NA
  job_201009151640_0013 3     1285107094387 s0894589 NORMAL   NA
  job_201009151640_0014 2     1285107283359 s0894589 NORMAL   NA
  job_201009151640_0015 2     1285109169514 s0894589 NORMAL   NA
  job_201009151640_0016 2     1285109271188 s0894589 NORMAL   NA
  job_201009151640_0018 1     1285148710573 s0894589 NORMAL   NA

Job Status:

To get the status of a particular job, we can use

mapred job -status $jobid

Where the $jobid is ID of a job found in the first column of the list table above. The status will show the percent of completion of mappers and reducers, along with a tracking URL and the location of a file with all the information about the job. We will soon see that the web interface provides much more detail.

Kill a Job

To kill a job, run the following command, where $jobid is the ID of the job you want to kill:

mapred job -kill $jobid

TASK: Run through the following exercise

To try this, copy the file /data/supplementary/sleeper.py somewhere on your local filesystem (not HDFS) and run it as a streaming job (specifying sleeper.py as the mapper, with no reducers, remember to use the -files option). You can set the input to be anything you like because the mapper doesn’t actually do anything (except wait for you to kill it), and also remember to set the output to be a directory on HDFS that does not exist. It might be helpful if you name your job to something familiar – this will reduce the time needed to find its ID later.

Open another terminal, log into the Hadoop cluster, and list all the jobs. Find the ID of the job you just started (use the name you gave it for faster searching). Find out the status of your job, and finally kill it. After you run the kill command, look at the terminal where you originally started the job and watch the job die.

Web Interface

All of the previous actions can also be performed using the web interface.

This shows the web interface of the jobtracker. We can see a list of running, completed, and failed jobs. Clicking on a job ID is similar to requesting its status from the command line, but it shows much more details, including the number of bytes read/written to the filesystem, number of failed/killed task attempts, and nice graphs of job completion.

Further Study

You now have a broad knowledge of the way in which hadoop is used. If you’d like to know more, please use the following resources. These are presented as optional reading, and good places to consult if you’re stuck.

  1. Official Hadoop Documention
  2. Hadoop Cheat Sheet
  3. Hadoop for Dummies Cheat Sheet
  4. YouTube Playlist – Hadoop Tutorials