The Spark project is an actively-developed open-source engine for data analytics on clusters using Scala, Python, or Java. It offers map, filter, and reduce operations over in-memory collections, data from local files, or data taken from HDFS, but unlike standard map-reduce frameworks, it offers the opportunity to cache intermediate results across the cluster (and can thus offer orders-of-magnitude improvements over standard map-reduce when implementing iterative algorithms). I’ve been using it lately and have been really impressed. However — as with many cool projects in the “big data” space — the chain of dependencies to get a working installation can be daunting.

In this post, we’ll walk through setting up Spark to run on a stock Fedora 18 installation. We’ll also build the Mesos cluster manager so that we can run Spark jobs under Mesos, and we’ll build Hadoop with support for Mesos (so that we’ll have the option to run standard Hadoop MapReduce jobs under Mesos as well). By following these steps, you should be up and running with Spark quickly and painlessly. ​

Preliminary steps

You’ll first need to install some packages so that you’ll have all of the build dependencies for the packages you’ll want to install. You may have some of these already installed, depending on what Fedora installation type you’ve used and what packages you’ve already installed, but this list should cover bringing you from a minimal Fedora installation to one that can support building Mesos, Hadoop, and Spark.

First, we’ll install some essential tools that might not already be on your system:

sudo yum install -y git wget patch tar autoconf automake autotools libtool bzip2

Then, we’ll install some compilers, language environments, build tools, and support libraries:

sudo yum install -y gcc gcc-c++ python scala
sudo yum install -y java-devel python-devel zlib-devel libcurl-devel openssl-devel
sudo yum install -y ant maven maven2

We should now have all of the dependencies installed to build Mesos, Hadoop, and Spark.

Building Mesos

Set JAVA_HOME in your environment:

export JAVA_HOME=/usr/lib/jvm/java-1.7.0/

Then create a working directory and clone the Mesos source repository. This will take a little while, since it’s a large (~180mb) repository:

mkdir ~/build
cd ~/build
git clone https://github.com/apache/mesos.git

We’re going to be working from the 0.12.x branch of the Mesos repository. Check that out:

cd mesos
git checkout 0.12.x

Now we can actually build Mesos:

./bootstrap
./configure --with-webui --with-included-zookeeper --disable-perftools
make && sudo make install

Building and running Hadoop

You can run Spark jobs under Mesos without using Hadoop at all, and Spark running under Mesos can get data from HDFS even if the HDFS service isn’t itself running under Mesos. However, in order to run Hadoop MapReduce jobs under Mesos, we’ll need to build a patched version of Hadoop. If you already have Hadoop installed and running and are interested simply in running Spark against data in HDFS, you can skip this step, but if you don’t have Hadoop installed, installing it this way is simple and will give you greater flexibility in the future.

Building the patched Hadoop is straightforward, since patches and build scripts are bundled with Mesos. Simply run the following command from within your mesos directory and follow the prompts:

./hadoop/TUTORIAL.sh

It will explain what it is doing while downloading, patching, and building Hadoop. It will then run an example Hadoop MapReduce job to make sure everything is working properly and remind you that you’ll need to make changes to the MapReduce configuration before running MapReduce jobs on your Mesos cluster. Since we aren’t going to be running Hadoop MapReduce jobs under Mesos right away, we’ll skip that step for now. However, we will be making some minor configuration changes.

First, while you’re still in the mesos directory, change to the directory where you built your patched Hadoop:

cd hadoop/hadoop-0.20.205.0/

Then edit conf/hadoop-env.sh with your favorite editor, replacing the commented-out line that sets JAVA_HOME with the following:

export JAVA_HOME=/usr/lib/jvm/java-1.7.0/

Finally, edit conf/core-site.xml and add the following property:

<property>
  <name>fs.default.name</name>
  <value>hdfs://localhost:9100</value>
</property>

Now we’re ready to run Hadoop. Check to make sure that you can ssh to your local machine without a password, since the Hadoop startup scripts will do this several times and you will get tired of typing your password. If you can’t and you already have a SSH key pair, append your public key to your authorized_keys file and make sure it’s only readable by you. If you don’t already have a SSH key pair on your machine, you can simply type in the following commands for a local-only setup:

ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
chmod 600 ~/.ssh/authorized_keys

Now you’re ready to format your HDFS storage:

./bin/hadoop namenode -format

Next, start all of the Hadoop daemons:

cd ~/build/mesos/hadoop/hadoop-0.20.205.0/
./bin/start-all.sh

If you’re running as root (shame on you!), you’ll get an error that the -jvm option isn’t supported. You can work around this error by running sed -i 's/jvm //' bin/hadoop or — if you prefer to do things manually — editing bin/hadoop and replacing the line that reads

HADOOP_OPTS="$HADOOP_OPTS -jvm server $HADOOP_DATANODE_OPTS"

with this line:

HADOOP_OPTS="$HADOOP_OPTS -server $HADOOP_DATANODE_OPTS"

If you had to make this change, run bin/stop-all.sh and then bin/start-all.sh again.

Storing an input file to HDFS

As long as we’re still thinking about Hadoop, let’s put some data there so we can process it after we’ve built Spark. For our Spark example, we’ll work with a Common Log Format file (like an Apache HTTPD AccessLog). (You probably have one sitting around somewhere.) Mine is called ~/access_log. Here’s how I’ll load it in to HDFS:

./bin/hadoop fs -mkdir input
./bin/hadoop fs -put ~/access_log input

I can check that it’s actually there:

./bin/hadoop fs -ls input | grep access_log

and, sure enough, I’ll get something like this in return:

-rw-r--r--   3 root supergroup  792036736 2013-04-10 14:32 /user/root/input/access_log

Shame on me, too, I guess. Now we’re ready to build Spark and run some example jobs against these data.

Building Spark

First, fetch the source tarball:

wget http://spark-project.org/files/spark-0.7.0-sources.tgz
tar -xzvf spark-0.7.0-sources.tgz
cd spark-0.7.0

In order to be able to fetch input data from HDFS, Spark needs to know what version of Hadoop you’re running before you build it. To do this, either open project/SparkBuild.scala in your favorite editor or run the following command:

sed -i 's/= "1.0.4"/= "0.20.205.0"/' project/SparkBuild.scala

Now we’ll use the Scala build tool to compile Spark:

./sbt/sbt package

We can now run a simple job to make sure Spark works. The SparkPi example uses a Monte Carlo method to approximate Pi:

export SCALA_HOME=/usr/share/scala
./run spark.examples.SparkPi local 1000

Now you should have a working Spark installation and can run some jobs locally. We’ll look at an example interaction with Spark next.

Running an example Spark job

The Spark shells use the MASTER environment variable to determine how to run jobs. Set it to local for now:

export MASTER=local

This will run your jobs locally, using only one thread. We’ll also want to enable Spark to use more memory if we’re going to work with a large dataset:

export SPARK_MEM=2g

If you want to use more or less than 2 gigabytes, change that setting appropriately; it uses the same format as Java heap size arguments. Now we’ll start up the Python Spark environment:

./pyspark

You’ll find yourself in a Python REPL with a SparkContext object initialized in the sc variable. Now, create a Spark resilient distributed dataset from the lines in the log file we uploaded to HDFS (noting, of course, that your URL may be different depending on how you stored the file):

log_entries = sc.textFile("hdfs://localhost:9100/user/root/input/access_log")

We’ll then write a short series of operations to count the number of log entries for each remote host:

ips = log_entries.map(lambda s: s.split()[0])
ips.map(lambda ip: (ip, 1)).reduceByKey(lambda a, b: a + b).collect()

This example won’t benefit all that much from Spark’s caching support, but it will run faster in parallel. If you have many cores in your machine, try using them! Here’s how you’d set MASTER if you wanted to use 8 threads:

export MASTER='local[8]'

Exit out of pyspark and try running the example again with more threads, if you feel so inclined.

Next steps

Now that you have Spark up and running, here are some things to consider trying:

  fedora, spark • You may reply to this post on Twitter or