Child pages
  • Big Data Tutorial 3: Introduction to Spark
Skip to end of metadata
Go to start of metadata

                                                                                                       

                                                                                   


 Today, Spark is being adopted by major players like Amazon, eBay, and Yahoo! Many organizations run Spark on clusters with thousands of nodes. According to the Spark FAQ, the largest known cluster has over 8000 nodes. Indeed, Spark is a technology well worth taking note of and learning about.

This tutorial provides an introduction and practical knowledge to Spark. It contains information from the Apache Spark website as well as the book Learning Spark - Lightning-Fast Big Data Analysis.

Spark on Hadoop: 

Spark is a framework for performing general data analytics on distributed computing cluster like Hadoop. It provides in memory computations for increase speed and data process over mapreduce. It runs on top of existing hadoop cluster and access hadoop data store (HDFS), can also process structured data in Hive and Streaming data from HDFS, Flume, Kafka, Twitter. Refer Big Data Tutorial 1 to know more about hadoop.


What is Apache Spark? An Introduction

Spark is an Apache project advertised as “lightning fast cluster computing”. It has a thriving open-source community and is the most active Apache project at the moment.

                 Spark provides a faster and more general data processing platform. Spark lets you run programs up to 100x faster in memory, or 10x faster on disk, than Hadoop. Last year, Spark took over Hadoop by completing the 100 TB Daytona GraySort contest 3x faster on one tenth the number of machines and it also became the fastest  open source engine for sorting a petabyte.

                 Spark also makes it possible to write code more quickly as you have over 80 high-level operators at your disposal. To demonstrate this, let’s have a look at the “Hello World!” of BigData: the Word Count example. Written in Java for MapReduce it has around 50 lines of code, whereas in Spark (and Scala) you can do it as simply as this:

 

sparkContext.textFile("hdfs://...")
            .flatMap(line => line.split(" "))
            .map(word => (word, 1)).reduceByKey(_ + _)
            .saveAsTextFile("hdfs://...")

 

Is there are point of learning Mapreduce, then?

A: Yes. For the following reason: 

  1. Mapreduce is a paradigm used by many big data tools including Spark. So, understanding the MapReduce paradigm and how to convert a problem into series of MR tasks is very important.

  2. When the data grows beyond what can fit into the memory on your cluster, the Hadoop Map-Reduce paradigm is still very relevant.

  3. Almost, every other tool such as Hive or Pig converts its query into MapReduce phases. If you understand the Mapreduce then you will be able to optimize your queries better.

 

 

 When do you use apache spark? OR  What are the benefits of Spark over Mapreduce?

A:

  1. Spark is really fast. As per their claims, it runs programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk. It aptly utilizes RAM to produce the faster results.

  2. In map reduce paradigm, you write many Map-reduce tasks and then tie these tasks together using Oozie/shell script. This mechanism is very time consuming and the map-reduce task have heavy latency.

  3. And quite often, translating the output out of one MR job into the input of another MR job might require writing another code because Oozie may not suffice.

  4. In Spark, you can basically do everything using single application / console (pyspark or scala console) and get  the results immediately. Switching between 'Running something on cluster' and 'doing something locally' is fairly easy and straightforward. This also leads to less context switch of the developer and more productivity.

  5. Spark kind of equals to MapReduce and Oozie put together.

 

Additional key features of Spark include:

  • Currently provides APIs in Scala, Java, and Python, with support for other languages (such as R) on the way.
  • Integrates well with the Hadoop ecosystem and data sources (HDFS, Amazon S3, Hive, HBase, Cassandra, etc.)
  • Can run on clusters managed by Hadoop YARN or Apache Mesos, and also run standalone. On dumbo it's managed by YARN.

The Spark core is complemented by a set of powerful, higher-level libraries which can be seamlessly used in the same application. These libraries currently include SparkSQL, Spark Streaming, MLlib (for machine learning), and GraphX.

                                                                                                                                                                                   

 

Spark Core

Spark Core is the base engine for large-scale parallel and distributed data processing. It is responsible for:

  • memory management and fault recovery
  • scheduling, distributing and monitoring jobs on a cluster
  • interacting with storage systems


Spark introduces the concept of an RDD (Resilient Distributed Dataset).

 

What is RDD - Resilient Distributed Dataset?

A:

RDD is a representation of data located on a network which is

  1. Immutable - You can operate on the rdd to produce another rdd but you can’t alter it.

  2. Partitioned / Parallel - The data located on RDD is operated in parallel. Any operation on RDD is done using multiple nodes.

  3. Resilience - If one of the node hosting the partition fails, another nodes takes its data.

 

Main Primitives - RDDs support two types of operations: 
  1. Transformations 
  2. Actions

 

What are Transformations?

A: The transformations are the functions that are applied on an RDD (resilient distributed data set). The transformation results in another RDD. A transformation is not executed until an action follows.

The example of transformations are:

  1. map() - applies the function passed to it on each element of RDD resulting in a new RDD.

  2. filter() - creates a new RDD by picking the elements from the current RDD which pass the function argument.

 

 

What are Actions?

A: An action brings back the data from the RDD to the local machine. Execution of an action results in all the previously created transformation. The example of actions are:

  1. reduce() - executes the function passed again and again until only one value is left. The function should take two argument and return one value.

  2. take() - take all the values back to the local node form RDD.

 

Transformations in Spark are “lazy”, meaning that they do not compute their results right away. Instead, they just “remember” the operation to be performed and the dataset (e.g., file) to which the operation is to be performed. The transformations are only actually computed when an action is called and the result is returned to the driver program. This design enables Spark to run more efficiently. For example, if a big file was transformed in various ways and passed to first action, Spark would only process and return the result for the first line, rather than do the work for the entire file.

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist or cache method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.

 
                                                                                                            

Hands on Exercise

Steps to connect to hadoop cluster ie., dumbo.

 

Windows:
Step - 1: User hostname "gw.hpc.nyu.edu" with port 22 in putty. Provide your credentials. 
Step - 2: Then, use "ssh <net id>@dumbo.hpc.nyu.edu". It will connect to dumbo cluster. 
Mac:
Step - 1: From terminal, give "ssh <net id>@gw.hpc.nyu.edu
Step - 2: Then, use "ssh <net id>@dumbo.hpc.nyu.edu". It will connect to dumbo cluster. 

 

Interactive Spark Shell:

Spark provides and interactive shell which gives a way to learn the API, as well as to analyze data sets interactively. Follow below step on dumbo cluster at the command prompt to connect to Spark Shell. 

spark-shell

 

Creating RDD using SparkContext

SparkContext represents the connection to a Spark execution environment. Infact, main entry point to Spark functionality is SparkContext. You have to create a Spark context before using Spark features and services in your application. A Spark context can be used to create RDD's to access Spark services and run jobs.

// Turn a Scala collection into an RDD
sc.parallelize(List(1, 2, 3));


// Load text file from Local file system
sc.textFile("/home/<net id>/<input file>");


// Load text file from hadoop i.e., HDFS
sc.textFile("/user/<net id>/<input file>");

 

Basic Transformations:

 

val nums = sc.parallelize(List(1,2,3));

// Pass each element through a function
val squares = nums.map(x => x*x); //{1, 4, 9}

// Keep elements passing a predicate
val even = squares.filter(_ % 2 == 0); //{4}

// Map each element to zero or more others
nums.flatMap(x => 1 to x); //{1,1,2,1,2,3}

 

Basic Actions:

 

val nums = sc.parallelize(List(1,2,3));

// Retrieve RDD contents as a local collection
nums.collect();

// Return first K elements
nums.take(2);

// Count number of elements
nums.count();

// merge elements with an associative function
nums.reduce(_ + _);

// Write elements to a text file
nums.saveAsTextFile("/user/<net id>/output");

 

 

Working with Key-Value pairs/operations:

val pets = sc.parallelize(List(("cat", 1), ("dog", 1), ("cat", 2)));

// reduceByKey also automatically implements combiners on the map side
val redu = pets.reduceByKey(_ + _);
redu.collect().foreach(println);

val srt = pets.sortByKey();
srt.collect().foreach(println);

val grp = pets.groupByKey();
grp.collect().foreach(println);

Other Operations: Joins/grouping

val visits = sc.parallelize(List(
("index.html", "1.2.3.4"),
("about.html", "3.4.5.6"),
("index.html", "1.3.3.1")));
val pageNames = sc.parallelize(List(
("index.html","Home"),
("about.html", "About")));

visits.join(pageNames)
visits.join(pageNames).collect().foreach(println(_));
visits.cogroup(pageNames).collect().foreach(println(_));


Loading data from a local file into an RDD.

-bash-4.1$ spark-shell --master local

scala> val file = sc.textFile("file:/home/<net_id>/<input_file>");

scala> file.count();

 

EXAMPLE 1: Processing word count task thru interactive spark shell using input file from HDFS.

Copy input file to HDFS 

hadoop fs -put /share/apps/Tutorials/Tutorial3/input/animals.txt /user/<net_id>/

Login to interactive spark shell using command "spark-shell".

-bash-4.1$ spark-shell

val file = sc.textFile("/user/<net id>/<input file>");

val counts = file.flatMap(file => file.split(" ")).map(word => (word, 1)).reduceByKey(_ + _);

counts.collect().foreach(println);

 

EXAMPLE 2: Word Count: The objective here is to count the number of occurrences of each word by using key-value pairs.

Step 1:  

Copy example2 folder to /home/<net_id>/ 

cp -r /share/apps/Tutorials/Tutorial3/example2 $HOME

cd $HOME/example2

Step 2:

Place the book.txt file on to hdfs 

hadoop fs -put /share/apps/Tutorials/Tutorial3/input/book.txt /user/<net id>/

Step 3: 

Build/compile/Create package using maven. Make sure pom.xml is present in the same directory. This command will generate "target" directory.

/opt/maven/bin/mvn package

Step 4: 

Execute the process using jar file created in "target" directory.

spark-submit --class SparkWordCountExample --master yarn target/scala-0.0.1-SNAPSHOT.jar /user/<net id>/book.txt /user/<net id>/output

Step 5:

Check output by accessing HDFS directories or copying data from HDFS to local system.

hadoop fs -ls /user/<net_id>/output

hadoop fs -cat /user/<net_id>/output/part-00000

OR

hadoop fs -getmerge /user/<net_id>/output $HOME/output.txt

cat $HOME/output.txt


EXAMPLE 3: Example from github.

Step 1:

Place the book.txt file on to hdfs 

hadoop fs -put /share/apps/Tutorials/Tutorial3/input/book.txt /user/<net id>/

 

Step 2: Create a directory to work with example from github.

 

mkdir $HOME/example3

cd $HOME/example3

 

Step 3: Clone a git repository to create a local copy of the code.

 

git clone https://github.com/sryza/simplesparkapp.git

cd $HOME/example3/simplesparkapp/

Step 4: Build/compile using maven. Make sure pom.xml is present in the same directory. This command will generate jar file "sparkwordcount-0.0.1-SNAPSHOT.jar" in directory location "target".

 

/opt/maven/bin/mvn package

cd $HOME/example3/simplesparkapp/target

Step 5: Execute the process. Make usre input file is copied into specified HDFS path. 

 

spark-submit --class com.cloudera.sparkwordcount.SparkWordCount --master yarn sparkwordcount-0.0.1-SNAPSHOT.jar /user/<net id>/book.txt 2

 

EXAMPLE 4: This example shows spark with python.

Step 1:  

Copy example4 folder to /home/<net_id>/ 

cp -r /share/apps/Tutorials/Tutorial3/example4 $HOME

cd $HOME/example4

Step 2:

Place the animals.txt file on to hdfs 

hadoop fs -put /share/apps/Tutorials/Tutorial3/input/animals.txt /user/<net id>/

Step 3: 

Execute the process.

spark-submit  sort.py  /user/<netid>/animals.txt   /user/<netid>/sparkoutput

Step 4:

Check output by accessing HDFS directory.

hadoop fs -ls /user/<net_id>/sparkoutput

hadoop fs -cat /user/<net_id>/sparkoutput/part-r-00000

 

EXAMPLE 5: Spark SQL on Python.

Spark interactive shell using python ie., PySpark. 

Step 1:

Place the input file on to hdfs 

hadoop fs -put /share/apps/Tutorials/Tutorial3/input/people.json /user/<net id>/

Step 2:

Follow below step on dumbo cluster at the command prompt to connect to python Spark interactive Shell.. 

pyspark

Step 3:

Steps to create and load data to table.

>>> users = sqlContext.read.json("/user/<net id>/people.json");
>>> users.registerTempTable("users");
>>> over21 = sqlContext.sql("SELECT name, age FROM users WHERE age > 21 ");
>>> over21.collect();

 

EXAMPLE 6: Access Hive database through Spark.

Spark interactive shell i.e., spark-shell. 

Step 1:

Please copy the Tutorial2 directory from '/share/apps/Tutorials/Tutorial2/' to '/home/netid/' 

cp -r /share/apps/Tutorials/Tutorial2/ $HOME

cd $HOME/Tutorial2

Step 2:

Follow below step on dumbo cluster at the command prompt to connect to Spark interactive Shell.. 

spark-shell

Step 3:

Steps to create and load data to hive table from spark shell.

scala> sqlContext.sql("CREATE TABLE IF NOT EXISTS <net id>.messages(user STRING, post STRING, time BIGINT, country STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'");
scala> sqlContext.sql("load data local inpath '/home/<net id>/Tutorial2/user_posts.txt' overwrite into table <net id>.messages");
scala> val result = sqlContext.sql("FROM <net id>.messages SELECT user, post, time");
scala> result.show()

 

Please contact hpc@nyu.edu to learn more

 

  • No labels