The Spark project is an actively-developed open-source engine for data analytics on clusters using Scala, Python, or Java. It offers map, filter, and reduce operations over in-memory collections, data from local files, or data taken from HDFS, but unlike standard map-reduce frameworks, it offers the opportunity to cache intermediate results across the cluster (and can thus offer orders-of-magnitude improvements over standard map-reduce when implementing iterative algorithms). I’ve been using it lately and have been really impressed. However – as with many cool projects in the “big data” space – the chain of dependencies to get a working installation can be daunting.
In this post, we’ll walk through setting up Spark to run on a stock Fedora 18 installation. We’ll also build the Mesos cluster manager so that we can run Spark jobs under Mesos, and we’ll build Hadoop with support for Mesos (so that we’ll have the option to run standard Hadoop MapReduce jobs under Mesos as well). By following these steps, you should be up and running with Spark quickly and painlessly. ## Preliminary steps
You’ll first need to install some packages so that you’ll have all of the build dependencies for the packages you’ll want to install. You may have some of these already installed, depending on what Fedora installation type you’ve used and what packages you’ve already installed, but this list should cover bringing you from a minimal Fedora installation to one that can support building Mesos, Hadoop, and Spark.
First, we’ll install some essential tools that might not already be on your system:
sudo yum install -y git wget patch tar autoconf automake autotools libtool bzip2
Then, we’ll install some compilers, language environments, build tools, and support libraries:
sudo yum install -y gcc gcc-c++ python scala
sudo yum install -y java-devel python-devel zlib-devel libcurl-devel openssl-devel
sudo yum install -y ant maven maven2
We should now have all of the dependencies installed to build Mesos, Hadoop, and Spark.
Building Mesos
Set JAVA_HOME
in your environment:
export JAVA_HOME=/usr/lib/jvm/java-1.7.0/
Then create a working directory and clone the Mesos source repository. This will take a little while, since it’s a large (~180mb) repository:
mkdir ~/build
cd ~/build
git clone https://github.com/apache/mesos.git
We’re going to be working from the 0.12.x
branch of the Mesos repository. Check that out:
cd mesos
git checkout 0.12.x
Now we can actually build Mesos:
./bootstrap
./configure --with-webui --with-included-zookeeper --disable-perftools
make && sudo make install
Building and running Hadoop
You can run Spark jobs under Mesos without using Hadoop at all, and Spark running under Mesos can get data from HDFS even if the HDFS service isn’t itself running under Mesos. However, in order to run Hadoop MapReduce jobs under Mesos, we’ll need to build a patched version of Hadoop. If you already have Hadoop installed and running and are interested simply in running Spark against data in HDFS, you can skip this step, but if you don’t have Hadoop installed, installing it this way is simple and will give you greater flexibility in the future.
Building the patched Hadoop is straightforward, since patches and build scripts are bundled with Mesos. Simply run the following command from within your mesos
directory and follow the prompts:
./hadoop/TUTORIAL.sh
It will explain what it is doing while downloading, patching, and building Hadoop. It will then run an example Hadoop MapReduce job to make sure everything is working properly and remind you that you’ll need to make changes to the MapReduce configuration before running MapReduce jobs on your Mesos cluster. Since we aren’t going to be running Hadoop MapReduce jobs under Mesos right away, we’ll skip that step for now. However, we will be making some minor configuration changes.
First, while you’re still in the mesos
directory, change to the directory where you built your patched Hadoop:
cd hadoop/hadoop-0.20.205.0/
Then edit conf/hadoop-env.sh
with your favorite editor, replacing the commented-out line that sets JAVA_HOME
with the following:
export JAVA_HOME=/usr/lib/jvm/java-1.7.0/
Finally, edit conf/core-site.xml
and add the following property:
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9100</value>
</property>
Now we’re ready to run Hadoop. Check to make sure that you can ssh
to your local machine without a password, since the Hadoop startup scripts will do this several times and you will get tired of typing your password. If you can’t and you already have a SSH key pair, append your public key to your authorized_keys
file and make sure it’s only readable by you. If you don’t already have a SSH key pair on your machine, you can simply type in the following commands for a local-only setup:
ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
chmod 600 ~/.ssh/authorized_keys
Now you’re ready to format your HDFS storage:
./bin/hadoop namenode -format
Next, start all of the Hadoop daemons:
cd ~/build/mesos/hadoop/hadoop-0.20.205.0/
./bin/start-all.sh
If you’re running as root
(shame on you!), you’ll get an error that the -jvm
option isn’t supported. You can work around this error by running sed -i 's/jvm //' bin/hadoop
or – if you prefer to do things manually – editing bin/hadoop
and replacing the line that reads
HADOOP_OPTS="$HADOOP_OPTS -jvm server $HADOOP_DATANODE_OPTS"
with this line:
HADOOP_OPTS="$HADOOP_OPTS -server $HADOOP_DATANODE_OPTS"
If you had to make this change, run bin/stop-all.sh
and then bin/start-all.sh
again.
Storing an input file to HDFS
As long as we’re still thinking about Hadoop, let’s put some data there so we can process it after we’ve built Spark. For our Spark example, we’ll work with a Common Log Format file (like an Apache HTTPD AccessLog
). (You probably have one sitting around somewhere.) Mine is called ~/access_log
. Here’s how I’ll load it in to HDFS:
./bin/hadoop fs -mkdir input
./bin/hadoop fs -put ~/access_log input
I can check that it’s actually there:
./bin/hadoop fs -ls input | grep access_log
and, sure enough, I’ll get something like this in return:
-rw-r--r-- 3 root supergroup 792036736 2013-04-10 14:32 /user/root/input/access_log
Shame on me, too, I guess. Now we’re ready to build Spark and run some example jobs against these data.
Building Spark
First, fetch the source tarball:
wget http://spark-project.org/files/spark-0.7.0-sources.tgz
tar -xzvf spark-0.7.0-sources.tgz
cd spark-0.7.0
In order to be able to fetch input data from HDFS, Spark needs to know what version of Hadoop you’re running before you build it. To do this, either open project/SparkBuild.scala
in your favorite editor or run the following command:
sed -i 's/= "1.0.4"/= "0.20.205.0"/' project/SparkBuild.scala
Now we’ll use the Scala build tool to compile Spark:
./sbt/sbt package
We can now run a simple job to make sure Spark works. The SparkPi
example uses a Monte Carlo method to approximate Pi:
export SCALA_HOME=/usr/share/scala
./run spark.examples.SparkPi local 1000
Now you should have a working Spark installation and can run some jobs locally. We’ll look at an example interaction with Spark next.
Running an example Spark job
The Spark shells use the MASTER
environment variable to determine how to run jobs. Set it to local
for now:
export MASTER=local
This will run your jobs locally, using only one thread. We’ll also want to enable Spark to use more memory if we’re going to work with a large dataset:
export SPARK_MEM=2g
If you want to use more or less than 2 gigabytes, change that setting appropriately; it uses the same format as Java heap size arguments. Now we’ll start up the Python Spark environment:
./pyspark
You’ll find yourself in a Python REPL with a SparkContext
object initialized in the sc
variable. Now, create a Spark resilient distributed dataset from the lines in the log file we uploaded to HDFS (noting, of course, that your URL may be different depending on how you stored the file):
log_entries = sc.textFile("hdfs://localhost:9100/user/root/input/access_log")
We’ll then write a short series of operations to count the number of log entries for each remote host:
ips = log_entries.map(lambda s: s.split()[0])
ips.map(lambda ip: (ip, 1)).reduceByKey(lambda a, b: a + b).collect()
This example won’t benefit all that much from Spark’s caching support, but it will run faster in parallel. If you have many cores in your machine, try using them! Here’s how you’d set MASTER
if you wanted to use 8 threads:
export MASTER='local[8]'
Exit out of pyspark
and try running the example again with more threads, if you feel so inclined.
Next steps
Now that you have Spark up and running, here are some things to consider trying:
- learn more about Spark by working through some examples
- read up on the Python or Scala interfaces to programming Spark
- run Spark under Mesos or across several nodes without Mesos
- check out some other cool projects that work with Spark, like Shark (an implementation of Apache Hive that uses Spark in the query planner) and Bagel (an implementation of Google’s Pregel system for graph processing)