/ SCHOOL

MapReduce Lab - Hadoop & Spark

MapReduce Lab - Hadoop & Spark

Preamble

During my PhD, I was a teaching assistant at Sorbonne University in Paris. In March 2020, working from home during the Covid-19 lockdown, I wrote this lab in English for the Master 1 students of Cloud Computing, which is following a MapReduce class I taught in English.

The lab is well guided at the beginning and allows the students to gradually progress by themselves, while increasing the difficulty of questions.

Sources are mainly the Hadoop Getting Started (Setting up a Single Node Cluster) and the Hadoop MapReduce tutorial.


Introduction

The purpose of this lab is to set up and configure a single-node Hadoop installation, so that you can quickly perform simple operations using Hadoop MapReduce and the Hadoop Distributed File System (HDFS).

Evaluation

This lab can be assessed. Open a new document (text file, Word, LibreOffice…) and write your first name, last name, student number and email.

For each assessed question in red, report the question and its number in your document, and write your answer or copy/paste code/result below the question.

Prerequisites

Having basic Unix knowledge (commands: cd, cat, ls, man…).

Having Ubuntu on VirtualBox with the following configuration:

  • Ubuntu Desktop 18.04 LTS (check updates and third-party software during install)
  • 4096 GB of RAM (at least 2048 GB)
  • 20 GB of virtual hard disk (VDI)
  • VirtualBox Guest Additions (after finishing the Ubuntu install)
  • Java 8 (OpenJDK: apt install openjdk-8-jdk)

1. Environment install

Hadoop is written in Java and requires the Java Runtime Environment to run.

1.1. Java

The OpenJDK 8 should be installed (cf. Prerequisites). A Java Development Kit (JDK) is a superset of Java Runtime Environment (JRE) and contains everything that is in JRE, plus developing tools (compilers, debuggers…).

Let’s verify the Java version installed.

Launch a terminal from the Applications menu (bottom left) or by pressing CTRL + ALT + T. Then execute the following command in the terminal:

$ java -version

The terminal should return:

openjdk version "1.8.0_242"
...

If it is not the case, execute the following command in the terminal (apt will ask your super user password and to confirm the install by typing Y for yes) to install OpenJDK version 8 and verify the version again:

$ sudo apt install openjdk-8-jdk

If the problem persists, select the correct Java version from the following command:

$ sudo update-alternatives --config java

1.2. SSH

ssh must be installed and sshd must be running to use the Hadoop scripts that manage remote Hadoop daemons. Additionally, it is recommended that pdsh also be installed for better ssh resource management.

Execute the following commands to install ssh then pdsh (apt will ask your super user password and to confirm the instal by typing Y for yes):

$ sudo apt install ssh
$ sudo apt install pdsh

2. Apache Hadoop

As a reminder, Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.

2.1. Preparing Hadoop

2.1.1. Download Hadoop

Download the latest stable version of Hadoop binary (version 3.1.2) directly from Apache using the wget command:

$ wget http://archive.apache.org/dist/hadoop/common/hadoop-3.1.2/ hadoop-3.1.2.tar.gz

Then extract the archive using the tar command:

$ tar xvzf hadoop-3.1.2.tar.gz

And move into the extracted archive:

$ cd hadoop-3.1.2

2.1.2. Hadoop and Java_home

Hadoop needs to know where the Java Runtime Environment is. Execute the following command to get the Java install path and copy the displayed result:

$ dirname $(dirname $(readlink -f $(which javac)))

Then edit the file etc/hadoop/hadoop-env.sh to define the JAVA_HOME parameters with the Java install path. You can use gedit or vi, emacs, nano… to edit files.

$ gedit etc/hadoop/hadoop-env.sh &

On line 54, you find the following comment:

# export JAVA_HOME=

Uncomment this line and paste the Java install path after the equals sign:

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

2.1.3. First run

Try the following command:

$ bin/hadoop

Question 1: What is displayed by the bin/hadoop command?

Question 2: Write the first line displayed by the bin/hadoop command in your document.

2.2. Two examples

By default, Hadoop is configured to run in a non-distributed mode, as a single Java process. This is useful for debugging.

Hadoop comes with different MapReduce examples.

2.2.1. Grep example

The first example finds and displays every match of the given regular expression, from files into an input directory and writes result files into an output directory.

Create an input directory which will contain input data:

$ mkdir input

Then copy the extracted configuration files to use as input:

$ cp etc/hadoop/*.xml input

Read carefully the following command to understand the first example, then execute it:

$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.2.jar grep input output 'dfs[a-z.]+'

Question 3: Explain each argument of the above command:

  • bin/hadoop: …
  • jar: …
  • share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.2.jar: … - grep: …
  • input: …
  • output: …
  • ‘dfs[a-z.]+’: …

Output is written to the given output directory:

$ cat output/*

Question 4: Which dfs… is displayed by the cat output/* command?

Question 5: According to the output directory and its contents only, how to know if the job worked well or not?

2.2.1. Word Count example

Let’s see another MapReduce example: WordCount. WordCount is a simple application that counts the number of occurrences of each word in a given input set. The input set will be a real text from a book: The Adventures of Tom Sawyer by Mark Twain.

Remove previous content of the input directory:

$ rm input/*

Download Mark Twain’s book from Project Gutenberg into the input directory (filename 74-0.txt):

$ wget https://www.gutenberg.org/files/74/74-0.txt -P input

Run the WordCount example:

$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.2.jar wordcount input output

An error should appear.

Question 6: What is the error message?

Question 7: How can you solve this error? 2 possible answers are expected.

After solving the error, execute the command again and open the output result file in a text editor for convenience.

In the result file, the word “you” is present many times for different typo: “you, you!, you!), YOU, etc.

Question 8: How many times the exact word you is present in the text?

Let’s take a closer look at the WordCount example.

2.3. A detailed example: WordCount

2.3.1. MapReduce job

A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re- executes the failed tasks.

More information is available in Appendix B if you have time.

2.3.2. Inputs and outputs

The MapReduce framework operates exclusively on <key, value> pairs, that is, the framework views the input to the job as a set of <key, value> pairs and produces a set of <key, value> pairs as the output of the job, conceivably of different types.

The key and value classes have to be serializable by the framework and hence need to implement the Writable interface. Additionally, the key classes have to implement the WritableComparable interface to facilitate sorting by the framework.

Input and Output types of a MapReduce job: (input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

2.3.3. Source code

The Java source code of the WordCount example is given in Appendix A. Have a look at it.

2.3.4. Map

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
    }
}

The Mapper implementation, via the map method, processes one line at a time, as provided by the specified TextInputFormat. It then splits the line into tokens separated by whitespaces, via the StringTokenizer, and emits a key-value pair of < <word>, 1>.

Considering the two following input files:

$ cat input/file01
Hello World Bye World

$ cat input/file02
Hello Hadoop Goodbye Hadoop

For the given sample input, the first map emits:

< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>

Question 9: What does the second map (i.e. the map who process the second input file) emits?

WordCount also specifies a combiner.

job.setCombinerClass(IntSumReducer.class);

Hence, the output of each map is passed through the local combiner (which is same as the Reducer as per the job configuration) for local aggregation, after being sorted on the keys.

The output of the first map:

< Bye, 1>
< Hello, 1>
< World, 2>

Question 10: What is the output of the second map?

2.3.5. Reduce

public void reduce(Text key, Iterable<IntWritable> values, Context context)
  throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
        sum += val.get();
    }
    result.set(sum);
    context.write(key, result);
}

The Reducer implementation, via the reduce method just sums up the values, which are the occurrence counts for each key (i.e. words in this example).

Question 11: What is the output of the reduce considering inputs from the two previous maps?

The main method specifies various facets of the job, such as the input/output paths (passed via the command line), key/value types, input/output formats etc., in the Job. It then calls the job.waitForCompletion to submit the job and monitor its progress.

Let’s modify the WordCount example to make it more precise and more suitable to our needs.

2.4. A first Hadoop job

2.4.1. Compilation

Before modifying the source code, let’s compile and run the WordCount example source code.

Download the source from a GitHub Gist:

$ wget https://gist.githubusercontent.com/aajisaka/7146403/raw/ 41db4766a55423a2b67cafeb4bcef0ba1df81e00/WordCount.java

Add your Java install path to your environment variable JAVA_HOME:

$ export JAVA_HOME=`dirname $(dirname $(readlink -f $(which javac)))`
$ export PATH=${JAVA_HOME}/bin:${PATH}
$ export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

You can also add these commands to your .bashrc to keep variables after a reboot.

Compile WordCount.java and create a jar:

$ bin/hadoop com.sun.tools.javac.Main WordCount.java

If you get bad substitution warning messages such as:

hadoop-3.1.2/.../libexec/hadoop-functions.sh: line 2453:
HADOOP_COM.SUN.TOOLS.JAVAC.MAIN_OPTS: bad substitution

No worries, this is an issue already solved by the community behind Apache Hadoop, and the fix will be part of the Hadoop v. 3.3.0 release (more info here: https://issues.apache.org/jira/browse/HADOOP-16167). You can continue the lab with these warning messages.

Then create a jar:

$ jar cf wc.jar WordCount*.class

Execute the Hadoop Job with the following command:

$ bin/hadoop jar wc.jar WordCount input output

Verify that the output file of the wc.jar job is the same as the one in the WordCount example from Hadoop MapReduce.

2.4.2. Word Count 2 - Lower case

Now it is your turn to modify WordCount.

Duplicate the WordCount source code to WordCount2.java and change every WordCount to WordCount2 (classname, job configuration…).

The first modification will be converting all words of the text in lower case. Modify the code of WorldCount2.java to apply this modification.

Then compile and run your WordCount2.

Question 12: How many times the exact word you is present in the text this time?

2.4.3. Word Count 3 - Parameter case sensitive

Now we want to enable the lower case feature through a parameter.

Duplicate the WordCount2 source code to WordCount3.java and change every WordCount2 to WordCount3 (classname, job configuration…).

Modify the code of WorldCount3.java to use a parameter like this:

$ bin/hadoop jar wc3.jar WordCount3 -Dwordcount.case.sensitive=false input output

Hint 1 - Code snippet of a method to retrieve a property’s value in the TokenizerMapper class, by default true:

private Boolean propertyValue;

@Override
public void setup(Context context) throws IOException, InterruptedException {
	Configuration conf = context.getConfiguration();
	propertyValue = conf.getBoolean("property.name", true);
}

Hint 2 - Code snippet to get remaining arguments:

import org.apache.hadoop.util.GenericOptionsParser;

public static void main(String[] args) throws Exception {
	Configuration conf = new Configuration();
	GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
	String[] remainingArgs = optionParser.getRemainingArgs();
  Job job = Job.getInstance(conf, "word count");
  ...
	FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
	FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
}

Question 13: Copy/paste your final WordCount3.java source code into the document.

2.4.4. Word Count 4 - Tokens

The output file still contains unwanted typos like ; ! ) . ”… Let’s replace them by a space character, to enable a more precise string split of the StringTokenizer.

Duplicate the WordCount3 source code to WordCount4.java and change every WordCount3 to WordCount4 (classname, job configuration…).

Bellow the list of characters to replace by a space:

"!", ".", ",", ")", "_", "?", "“", "”", "--", ";", "'", "(", "*", ":"

Modify the code of WorldCount4.java to remove unwanted characters.

Then compile and run your WordCount4 with no case sensitivity.

Question 14: Finally, how many times the word you is present in the text?

2.5. Pseudo-Distributed Operation

Hadoop can also be run on a single-node in a pseudo-distributed mode where each Hadoop daemon runs in a separate Java process.

Until now, input and output files were on the local filesystem (ext4). Let’s use the Hadoop Distributed FileSystem (hdfs).

2.5.1. Configuration

Edit the following configuration files with your text editor

etc/hadoop/core-site.xml:

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>

etc/hadoop/hdfs-site.xml (note the number of replicate):

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

2.5.2. Setup passphraseless ssh

Now check that you can ssh to the localhost without a passphrase (reply yes and type your user password):

$ ssh localhost

By default, you cannot ssh to localhost without a passphrase. To do so, execute the following commands:

$ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
$ chmod 0600 ~/.ssh/authorized_keys

Then export your pdsh rcmd (parallel ssh remote command) to ssh (you can add it to your .bashrc):

$ export PDSH_RCMD_TYPE=ssh

2.5.3. Downloading more books

Add the following books from Project Gutenberg to your input files:

The Count of Monte Cristo by Alexandre Dumas

$ wget https://www.gutenberg.org/files/1184/1184-0.txt -P input

Les Misérables by Victor Hugo

$ wget https://www.gutenberg.org/files/135/135-0.txt -P input

2.5.4. Execution

The following instructions run a MapReduce job locally.

Format the HDFS filesystem:

$ bin/hdfs namenode -format

Start NameNode daemon and DataNode daemon:

$ sbin/start-dfs.sh

The hadoop daemon log output is written to the $HADOOP_LOG_DIR directory (defaults to $HADOOP_HOME/logs).

Verify that NameNode and DataNode is up by browsing the web interface for the NameNode on your Ubuntu virtual machine. By default it is available at:

NameNode - http://localhost:9870/

On the Datanodes tab, click on the HTTP Address of your node.

Question 15: What is the block’s max size in your Block Pools?

Make the HDFS directories required to execute MapReduce jobs (replace <username> by your current username):

$ bin/hdfs dfs -mkdir /user
$ bin/hdfs dfs -mkdir /user/<username>

Question 16: Explain each argument of the first command bin/hdfs dfs -mkdir /user:

  • bin/hdfs: …
  • dfs: …
  • -mkdir: …
  • /user: …

Copy the input files into the distributed filesystem:

$ bin/hdfs dfs -mkdir input
$ bin/hdfs dfs -put input/74-0.txt input
$ bin/hdfs dfs -put input/1184-0.txt input $ bin/hdfs dfs -put input/135-0.txt input

Run your WordCount4 MapReduce job:

$ bin/hadoop jar wc4.jar WordCount4 -Dwordcount.case.sensitive=false input output

Examine the output files from 2 different ways. View the output files on the distributed filesystem:

$ bin/hdfs dfs -cat output/*

Then copy the output files from the distributed filesystem to the local filesystem and examine them:

$ bin/hdfs dfs -get output output
$ cat output/*

When you are done, stop the daemons with:

$ sbin/stop-dfs.sh

Question 17: How many times the word what is present in the text?

2.6. YARN on a Single Node

A MapReduce job can run on YARN in a pseudo-distributed mode by setting a few parameters and running ResourceManager daemon and NodeManager daemon in addition.

Start NameNode daemon and DataNode daemon:

$ sbin/start-dfs.sh

Edit the following configuration files with your text editor

etc/hadoop/mapred-site.xml:

<configuration>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
  <property>
    <name>mapreduce.application.classpath</name>
    <value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
  </property>
</configuration>

etc/hadoop/yarn-site.xml:

<configuration>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
  <property>
    <name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
  </property>
</configuration>

Start ResourceManager daemon and NodeManager daemon:

$ sbin/start-yarn.sh

Browse the web interface for the ResourceManager. By default it is available at:

ResourceManager - http://localhost:8088/

Run your WordCount4 MapReduce job:

$ bin/hadoop jar wc4.jar WordCount4 -Dwordcount.case.sensitive=true input output

You may get an error message like the following one:

INFO mapreduce.Job: Task Id : ..., Status : FAILED

Container [...] is running 384657920B beyond the 'VIRTUAL' memory limit. Current usage: 378.7 MB of 1 GB physical memory used; 2.5 GB of 2.1 GB virtual memory used. Killing container.

Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143.

Question 18: What is the cause of the error message?

Question 19: Find two potential solutions to this problem.

Try to run your WordCount4 MapReduce job again.

Browse the finished task on the web interface (Cluster > Applications > FINISHED).

Question 20: What are the FinalStatus of the tasks?

You can copy the output files from the distributed filesystem to the local filesystem to examine them.

When you’re done, stop the daemons with:

$ sbin/stop-yarn.sh
$ sbin/stop-dfs.sh

3. Spark

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

3.1. Download

To start with Spark, download the latest stable version (not the preview one) pre-built for Hadoop 2.7, from the official website (as seen before in the lab):

https://spark.apache.org/downloads.html

Then extract the archive and move into it from a terminal (as seen before in the lab).

3.2. Examples

Spark comes with several sample programs. Scala, Java, Python and R examples are in the examples/src/main directory. To run one of the Java or Scala sample programs, use bin/ run-example <class> [params] in the top-level Spark directory. For example,

./bin/run-example SparkPi 10

Execute the above command.

Question 21: What does the above example do?

Question 22: Among all the displayed lines, copy/paste the one that gives the result in your document.

Change the last parameter to a different value, such as 50, and run the example again.

Question 23: What is this last parameter for and what does TID stand for?

3.3. Spark Standalone Mode

Spark provides a simple standalone deploy mode (in addition to running on the Mesos or YARN cluster managers). You can launch a standalone cluster either manually, by starting a master and workers by hand, or use a provided launch scripts.

You can start a standalone master server by executing:

$ ./sbin/start-master.sh

Once started, the master gets a spark://HOST:PORT URL for itself, which you can use to connect workers to it. You can find this URL on the master’s web UI, which is http://localhost:8080 by default.

Question 24: What is the URL of your master server?

Similarly, you can start one or more workers and connect them to the master via:

$ ./sbin/start-slave.sh <master-spark-URL>

Once you have started a worker, go back to the master’s web UI (http://localhost:8080 by default). You should see the new node listed there, along with its number of CPUs and memory (minus one gigabyte left for the OS).

3.4. Scala shell

Let’s run Spark interactively through a modified version of the Scala shell, to communicate with our standalone cluster.

To run an interactive Spark shell against the cluster, execute the following command with the previous URL of the master server:

./bin/spark-shell --master <master-spark-URL>

You should see the Spark logo appear.

Go back to the master’s web UI (http://localhost:8080 by default. You should see a new running application.

Question 25: What is the name of the running application?

In the Spark shell, type :help to get an overview of a few shell commands.

3.4.1. Basics

Spark’s primary abstraction is a distributed collection of items called a Dataset. Datasets can be created from Hadoop InputFormats (such as HDFS files) or by transforming other Datasets. Let’s make a new Dataset from the text of the README file in the Spark source directory:

scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

You can get values from Dataset directly, by calling some actions, or transform the Dataset to get a new one.

scala> textFile.count() // Number of items in this Dataset
res0: Long = 104

scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark

Now let’s transform this Dataset into a new one. We call filter to return a new Dataset with a subset of the items in the file.

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

We can chain together transformations and actions:

scala> textFile.filter(line => line.contains("Spark")).count()

Question 26: What does this last command do?

3.4.2. More on dataset operations

Dataset actions and transformations can be used for more complex computations. Let’s say we want to find the line with the most words:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 16

This first maps a line to an integer value, creating a new Dataset. reduce is called on that Dataset to find the largest word count. The arguments to map and reduce are Scala function literals (closures), and can use any language feature or Scala/Java library. For example, we can easily call functions declared elsewhere. We’ll use Math.max() function to make this code easier to understand:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 16

Spark can implement MapReduce data flow pattern easily:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

Here, we call flatMap to transform a Dataset of lines to a Dataset of words, and then combine groupByKey and count to compute the per-word counts in the file as a Dataset of (String, Long) pairs. To collect the word counts in our shell, we can call collect:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

As you can see, the previous code snippets create a WordCount with a few lines compared to the Hadoop WordCount previously in the lab.

Question 27: Write a Scala line to get the most used word.

Hint 1: One way of accessing tuple elements in Scala is by position. The individual elements are named _1, _2, and so forth.

val people = ("Bob", 25)
println(people._1) // Bob
println(people._2) // 25

Hint 2: you can use a reduce function with an if to compare elements.

Question 28: After executing your Scala line, how many times is written the most used word?

The most used word should be strange, right? Quite hard to pronounce…

Question 29: Modify your Scala line to get only non empty word and write it in your document.

Question 30: Execute your Scala line and write the displayed result in your document.

3.4.2. Caching

Spark also supports pulling data sets into a cluster-wide in-memory cache. This is very useful when data is accessed repeatedly, such as when querying a small “hot” dataset or when running an iterative algorithm like PageRank. As a simple example, let’s mark our linesWithSpark dataset to be cached:

scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

It may seem silly to use Spark to explore and cache a 100-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively by connecting bin/spark-shell to a cluster.

3.5. Submitting job

The spark-submit script in Spark’s bin directory is used to launch applications on a cluster.

This command launch the SparkPi job example to the standalone cluster.

$ ./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master <master-spark-URL> \
  examples/jars/spark-examples_2.11-2.4.5.jar \
  100

3.6. Self-Contained Applications

Suppose we wish to write a self-contained application using the Spark API.

This example will use Maven to compile an application JAR:

$ sudo apt install maven

We’ll create a very simple Spark application, SimpleApp.java:

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
    Dataset<String> logData = spark.read().textFile(logFile).cache();

    long numAs = logData.filter(s -> s.contains("a")).count();
    long numBs = logData.filter(s -> s.contains("b")).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

    spark.stop();
  }
}

This program just counts the number of lines containing ‘a’ and the number containing ‘b’ in the Spark README. Note that you’ll need to replace YOUR_SPARK_HOME with the location where Spark is installed. Unlike the earlier examples with the Spark shell, which initializes its own SparkSession, we initialize a SparkSession as part of the program.

To build the program, we also write a Maven pom.xml file that lists Spark as a dependency. Note that Spark artifacts are tagged with a Scala version.

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>2.4.5</version>
      <scope>provided</scope>
    </dependency>
  </dependencies>
</project>

We lay out these files according to the canonical Maven directory structure:

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

Now, we can package the application using Maven and execute it with ./bin/spark-submit.

# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master <master-spark-URL> \
  target/simple-project-1.0.jar
...

Question 31: How many lines with a? How many lines with b?

Question 32: Write a WordCount in Java and execute it as a Spark Self-Contained Applications.

3.7. Launching Spark on YARN

Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the Hadoop cluster. These configs are used to write to HDFS and connect to the YARN ResourceManager. The configuration contained in this directory will be distributed to the YARN cluster so that all containers used by the application use the same configuration. If the configuration references Java system properties or environment variables not managed by YARN, they should also be set in the Spark application’s configuration (driver, executors, and the AM when running in client mode).

There are two deploy modes that can be used to launch Spark applications on YARN. In cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.

Unlike other cluster managers supported by Spark in which the master’s address is specified in the --master parameter, in YARN mode the ResourceManager’s address is picked up from the Hadoop configuration. Thus, the --master parameter is yarn.

To launch a Spark application in cluster mode:

$ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]

For example:

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 4g \
    --executor-memory 2g \
    --executor-cores 1 \
    --queue thequeue \
    examples/jars/spark-examples*.jar \
    10

The above starts a YARN client program which starts the default Application Master. Then SparkPi will be run as a child thread of Application Master. The client will periodically poll the Application Master for status updates and display them in the console. The client will exit once your application has finished running.

To launch a Spark application in client mode, do the same, but replace cluster with client. The following shows how you can run spark-shell in client mode:

$ ./bin/spark-shell --master yarn --deploy-mode client

In cluster mode, the driver runs on a different machine than the client, so SparkContext.addJar won’t work out of the box with files that are local to the client. To make files on the client available to SparkContext.addJar, include them with the --jars option in the launch command.

$ ./bin/spark-submit --class my.main.Class \
    --master yarn \
    --deploy-mode cluster \
    --jars my-other-jar.jar,my-other-other-jar.jar \
    my-main-jar.jar \
    app_arg1 app_arg2

Question 33: Launch your Java WordCount on YARN.

Question 34: Write another program like a LetterCount (which count the number of letters in a text - useful to decrypt the shit key of a Caesar code for example) in Scala and launch it on YARN.


Appendix A - WordCount source code

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Appendix B - Details about Hadoop MapReduce

MapReduce Job

A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.

Typically the compute nodes and the storage nodes are the same, that is, the MapReduce framework and the Hadoop Distributed File System (HDFS Architecture Guide) are running on the same set of nodes. This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in very high aggregate bandwidth across the cluster.

The MapReduce framework consists of a single master ResourceManager, one worker NodeManager per cluster-node, and MRAppMaster per application (YARN Architecture Guide).

Minimally, applications specify the input/output locations and supply map and reduce functions via implementations of appropriate interfaces and/or abstract-classes. These, and other job parameters, comprise the job configuration.

The Hadoop job client then submits the job (jar/executable etc.) and configuration to the ResourceManager which then assumes the responsibility of distributing the software/configuration to the workers, scheduling tasks and monitoring them, providing status and diagnostic information to the job-client.

Although the Hadoop framework is implemented in Java, MapReduce applications need not be written in Java (Hadoop Streaming run jobs with any executables (e.g. shell utilities), Hadoop Pipes is a SWIG-compatible C++ API).

Applications typically implement the Mapper and Reducer interfaces to provide the map and reduce methods. These form the core of the job.

Mapper

Mapper maps input key/value pairs to a set of intermediate key/value pairs.

Maps are the individual tasks that transform input records into intermediate records. The transformed intermediate records do not need to be of the same type as the input records. A given input pair may map to zero or many output pairs.

The Hadoop MapReduce framework spawns one map task for each InputSplit generated by the InputFormat for the job.

Overall, mapper implementations are passed to the job via Job.setMapperClass(Class) method. The framework then calls map(WritableComparable, Writable, Context) for each key/value pair in the InputSplit for that task. Applications can then override the cleanup(Context) method to perform any required cleanup.

Output pairs do not need to be of the same types as input pairs. A given input pair may map to zero or many output pairs. Output pairs are collected with calls to context.write(WritableComparable, Writable).

Applications can use the Counter to report its statistics.

All intermediate values associated with a given output key are subsequently grouped by the framework, and passed to the Reducer(s) to determine the final output. Users can control the grouping by specifying a Comparator via Job.setGroupingComparatorClass(Class).

The Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.

Users can optionally specify a combiner, via Job.setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer.

The intermediate, sorted outputs are always stored in a simple (key-len, key, value-len, value) format. Applications can control if, and how, the intermediate outputs are to be compressed and the CompressionCodec to be used via the Configuration.

How many maps?

The number of maps is usually driven by the total size of the inputs, that is, the total number of blocks of the input files.

The right level of parallelism for maps seems to be around 10-100 maps per-node, although it has been set up to 300 maps for very cpu-light map tasks. Task setup takes a while, so it is best if the maps take at least a minute to execute.

Thus, if you expect 10TB of input data and have a blocksize of 128MB, you’ll end up with 82,000 maps, unless Configuration.set(MRJobConfig.NUM_MAPS, int) (which only provides a hint to the framework) is used to set it even higher.

Reducer

Reducer reduces a set of intermediate values which share a key to a smaller set of values.

The number of reduces for the job is set by the user via Job.setNumReduceTasks(int).

Overall, Reducer implementations are passed the Job for the job via the Job.setReducerClass(Class) method and can override it to initialize themselves. The framework then calls reduce(WritableComparable, Iterable<Writable>, Context) method for each <key, (list of values)> pair in the grouped inputs. Applications can then override the cleanup(Context) method to perform any required cleanup.

Reducer has 3 primary phases: shuffle, sort and reduce.

Shuffle

Input to the Reducer is the sorted output of the mappers. In this phase the framework fetches the relevant partition of the output of all the mappers, via HTTP.

Sort

The framework groups Reducer inputs by keys (since different mappers may have output the same key) in this stage.

The shuffle and sort phases occur simultaneously; while map-outputs are being fetched they are merged.

Secondary sort

If equivalence rules for grouping the intermediate keys are required to be different from those for grouping keys before reduction, then one may specify a Comparator via Job.setSortComparatorClass(Class). Since Job.setGroupingComparatorClass(Class) can be used to control how intermediate keys are grouped, these can be used in conjunction to simulate secondary sort on values.

Reduce

In this phase the reduce(WritableComparable, Iterable<Writable>, Context) method is called for each <key, (list of values)> pair in the grouped inputs.

The output of the reduce task is typically written to the FileSystem via Context.write(WritableComparable, Writable).

Applications can use the Counter to report its statistics.

The output of the Reducer is not sorted.

How many reduces?

The right number of reduces seems to be 0.95 or 1.75 multiplied by (<no. of nodes> * <no. of maximum containers per node>).

With 0.95 all of the reduces can launch immediately and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing.

Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures.

The scaling factors above are slightly less than whole numbers to reserve a few reduce slots in the framework for speculative-tasks and failed tasks.

Reducer none

It is legal to set the number of reduce-tasks to zero if no reduction is desired.

In this case the outputs of the map-tasks go directly to the FileSystem, into the output path set by FileOutputFormat.setOutputPath(Job, Path). The framework does not sort the map-outputs before writing them out to the FileSystem.

Partitioner

Partitioner partitions the key space.

Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or a subset of the key) is used to derive the partition, typically by a hash function. The total number of partitions is the same as the number of reduce tasks for the job. Hence this controls which of the m reduce tasks the intermediate key (and hence the record) is sent for reduction.

HashPartitioner is the default Partitioner.

Counter

Counter is a facility for MapReduce applications to report its statistics.

Mapper and Reducer implementations can use the Counter to report statistics.

Hadoop MapReduce comes bundled with a library of generally useful mappers, reducers, and partitioners.


arnaud

Arnaud Favier

Entrepreneur, cofounder & CTO at Flambo, PhD in computer science and software engineer

Read More
>
School>MapReduce Lab - Hadoop & Spark
Share this