I’m speaking at Spark Summit today about using Spark to analyze operational data from the Fedora project. Here are some links to further resources related to my talk:

You should also check out my team’s Silex library, which contains useful code factored out of real Spark applications we’ve built in Red Hat’s Emerging Technology group. It includes a lot of cool functionality, but the part I mentioned in the talk is this handy interface for preprocessing JSON data before inferring a schema.

In this post, we’ll see how to make a simple transformer for Spark ML Pipelines. The transformer we’ll design will generate a sparse binary feature vector from an array-valued field representing a set.

Preliminaries

The first thing we’ll need to do is expose Spark’s user-defined type for vectors. This will enable us to write a user-defined data frame function that returns a Spark vector. (We could also implement our own user-defined type, but reusing Spark’s, which is currently private to Spark, will save us some time. By the time you read this, the type may be part of Spark’s public API — be sure to double-check!)

1
2
3
4
package org.apache.spark.hacks {
  // make VectorUDT available outside of Spark code
  type VectorType = org.apache.spark.mllib.linalg.VectorUDT
}

Imports

Here are the imports we’ll need for the transformer and support code. I’ll use VEC for Spark vectors to avoid confusion with Scala’s Vector type. We’ll assume that the VectorType code from above is available on your project’s classpath.

1
2
3
4
5
6
7
8
9
10
11
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

import org.apache.spark.ml.param._
import org.apache.spark.ml.Transformer

import org.apache.spark.mllib.linalg.{Vector => VEC, Vectors}
import org.apache.spark.sql.types._

import org.apache.spark.hacks.VectorType

Transformer and support code

Most of the ML pipeline classes distributed with Spark follow the convention of putting groups of related pipeline stage parameters in a trait. We’ll do this as well, declaring a trait for the three parameters that our transformer will use: the name of the input column, the name of the output column, and the maximum number of elements our sparse vector can hold. We’ll also define a convenience method to return a triple of the parameter values we care about.

1
2
3
4
5
6
7
8
9
10
11
12
trait SVParams extends Params {
  val inputCol = new Param[String](this, "inputCol", "input column")
  val outputCol = new Param[String](this, "outputCol", "output column")
  val vecSize = new IntParam(this, "vecSize", "maximum sparse vector size")

  def pvals(pm: ParamMap) = (
    // this code can be cleaner in versions of Spark after 1.3
    paramMap.get(inputCol).getOrElse("topicSet"),
    paramMap.get(outputCol).getOrElse("features"),
    paramMap.get(vecSize).getOrElse(128)
  )
}

Note that Spark 1.4 supports calling getOrElse directly on a ParamMap instance, so you can slightly simplify the code in pvals if you don’t care about source compatibility with Spark 1.3.

Here’s what the actual transformer implementation looks like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class SetVectorizer(override val uid: String)
    extends Transformer with SVParams {
  val VT = new org.apache.spark.hacks.VectorType()

  def transformSchema(schema: StructType, params: ParamMap) = {
    val outc = paramMap.get(outputCol).getOrElse("features")
    StructType(schema.fields ++ Seq(StructField(outc, VT, true)))
  }

  def transform(df: DataFrame, params: ParamMap) = {
    val (inCol, outCol, maxSize) = pvals(paramMap)
    df.withColumn(outCol, callUDF({ a: Seq[Int] =>
      Vectors.sparse(maxSize, a.toArray, Array.fill(a.size)(1.0))
    }, VT, df(inCol)))
  }
}

The first thing we do in the transformer class is declare an instance of VectorType to use in other data frame type declarations later in the class. The transformSchema method returns the schema after applying this transformer to a given data frame; it creates a new data frame schema that includes all of the fields from the original frame as well as a Vector-valued field whose name is the parameter specified in the outputCol parameter. Finally, the transform method creates a new data frame with an additional column (again, named with the value of the outputCol parameter); its values result of applying a user-defined function to each row in the data frame, taking arguments from the input column. The function itself simply creates a sparse binary vector from an array-backed set, so that the array-backed set Array(1,2,4,8) would become a sparse vector with the first, second, fourth, and eighth elements set to 1 and everything else set to 0.

The code above is a reasonable starting point for your own transformers, but you’ll want to add error checking to code you use in production: at a minimum, you’d need to validate the schema of the input data frame (to ensure that expected columns exist and are of the correct type), verify that the output column name doesn’t already exist in the data frame, and make sure no input array has more than vecSize elements. I hope this code is helpful as you develop your own pipeline stages!

This post will show you an extremely simple way to make quick-and-dirty Bokeh plots from data you’ve generated in Spark, but the basic technique is generally applicable to any data that you’re generating in some application that doesn’t necessarily link in the Bokeh libraries.

Getting started

We’ll need to have a recent version of Bokeh installed and some place to put our data. If you don’t already have Bokeh installed, use virtualenv to get it set up:

1
2
3
virtualenv ~/.bokeh-venv
source ~/.bokeh-venv/bin/activate
pip install bokeh numpy pandas

This will download and build a nontrivial percentage of the Internet. Consider getting some coffee or catching up with a colleague while it does its thing.

The next thing we’ll need is some place to stash data we’ve generated. I’ll use a very simple service that I developed just for this kind of application, but you can use whatever you want as long as it will let you store and retrieve JSON data from a given URL.

With all that in place, we’re ready to make a simple plot.

A basic static-HTML plot

We’re going to use Bokeh to construct a basic line graph in a static HTML file. The catch is that instead of specifying the data statically, we’re going to specify it as a URL, which the page will poll so it can update the plot when it changes. Here’s what a script to construct that kind of plot looks like:

1
2
3
4
5
6
7
8
9
#!/usr/bin/env python

from bokeh.plotting import figure, output_file, show
from bokeh.models.sources import AjaxDataSource
output_file("json.html", title="data polling example")
source = AjaxDataSource(data_url="http://localhost:4091/tag/foo", polling_interval=1000)
p = figure()
p.line('x', 'y', source=source)
show(p)

Note that the above example assumes you’re using the Firkin service as your data store. If you’re using something else, you will probably need to change the data_url parameter to the AjaxDataSource constructor.

Publishing and updating data

Now we’ll fire up the Firkin server and post some data. From a checkout of the Firkin code, run sbt server/console and then create a connection to the server using the client library:

1
val client = new com.freevariable.firkin.Client("localhost", 4091)

You can now supply some values for x and y. We’ll start simple:

1
2
/* publish an object to http://localhost:4091/tag/foo */
client.publish("foo", """{"x": [1,2,3,4,5,6,7,8,9,10], "y": [2,4,6,8,10,12,14,16,18,20]}""")

If you don’t already have json.html from the previous step loaded in your browser, fire it up. You should see a plot that looks like this:

plot of y=2x for 1..10

If we update the data stored at this URL, the plot will update automatically. Try it out; publish a new data set:

1
client.publish("foo", """{"x": [1,2,3,4,5,6,7,8,9,10], "y": [2,4,6,8,10,12,14,16,18,30]}""")

The plot will refresh, reflecting the updated y value:

plot of y=2x for 1..9 and y=3x for x=10

(Again, if you’re using some other JSON object cache to store your data sets, the principles will be the same but the syntax won’t be.)

Connecting Bokeh to Spark

Once we have a sink for JSON data, it’s straightforward to publish data to it from Spark code. Here’s a simple example, using json4s to serialize the RDDs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

// assume that spark is a SparkContext object, declared elsewhere
val x = spark.parallelize(1 to 100)
// x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>

val y = x.map(_ / 4)
y: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>

val json = ("x" -> x.collect.toList) ~ ("y" -> y.collect.toList)

val client = new com.freevariable.firkin.Client("localhost", 4091)

client.publish("data", compact(render(json)))

After we do this, the data will be available at http://localhost:4091/tag/data.

Alternate approaches

The approach described so far in this post was designed to provide the simplest possible way to make a basic plot backed by a relatively small amount of dynamic data. (If we have data to plot that we can’t store in memory on a single node, we’ll need something more sophisticated.) It is suitable for prototyping or internal use. Other approaches, like the two below, might make more sense for other situations:

  1. Bokeh offers several alternate language bindings, including one for Scala. This means that you can define plots and supply data for them from within applications that link these libraries in. This is the approach that spark-notebook takes, for example.
  2. Another possibility is setting up the Bokeh server and publishing plots and data to it. The Bokeh server offers several additional capabilities, including support for scalable visualizations (e.g., via sampling large datasets) and support for multiple authenticated users.

Most people set personal and professional goals. If you work in software, your near-term professional goals might sound like this:

  • Ship the next version of our team’s product without any undiscovered crashing bugs
  • Improve the performance of the caching layer by 10%
  • Become a committer on some particular open-source project

Those probably sound pretty reasonable, right? They aren’t vague wishes, they’re goals. They probably aren’t totally unrealistic, and you’ll know you’ve achieved them when some objective, measurable thing happens. (They’re SMART, in other words.)

We’ll come back to these goals. First, though, I’d like digress and talk about bike racing, and specifically about the kinds of goals an amateur racer might set for a year. Maybe those would look like this:

  • Stay healthy and avoid injuries all season
  • Improve my one-minute power output by 0.5 W/kg
  • Upgrade my racing category

These are more or less analogous to the professional goals. They’re objective, measurable, and reasonable. However, they’re also at least partially out of your control. You can’t totally prevent yourself from getting sick or crashing. Your one-minute power might be right at the limits of your genetic capabilities (or maybe your season shapes up so that it makes more sense to focus on improving other aspects of your fitness). And the category upgrade is up to an official in your local cycling association — you can get an automatic upgrade by winning races, but whether or not you win is also largely out of your control.

I first read about the idea of process and outcome goals in Joe Friel’s The Cyclist’s Training Bible, but I’ve found them extremely helpful in talking about professional goals as well. The idea is that there are two kinds of goals: outcome goals, which are results, and process goals, which relate to how you prepare and execute. Of course, you’re ultimately interested in outcomes (winning races, shipping great products, becoming recognized as an authority in your field), but outcomes are always to some extent out of your control.

Instead of focusing on outcomes for goals, it makes more sense to focus on what you can control: the processes that support those outcomes. So, in the bike case, you could recast the outcome goals as:

  • Make sure I get at least 7 hours of sleep a night and avoid sketchy racers in corners or sprints
  • Do plyometric exercises at least twice a week in the offseason
  • Identify key races that I can do well in and make sure to show up for them well-rested and with adequate nutrition; then focus on avoiding particular tactical mistakes I’ve made in past races

The software goals I mentioned at the beginning of the article are also outcome goals. You can’t guarantee that your product won’t have any undiscovered crashing bugs, but you can sure put processes in place so that you’re more likely to discover crashing bugs before you ship. Similarly, a 10% performance improvement sounds great, but it might require unjustifiable effort or be unachievable (conversely, it might turn out that 10% improvement is insufficient to improve the behavior of the whole system)! Community leadership positions always involve political as well as meritocratic considerations; while you can make a case for yourself as a good candidate, you can’t force people to vote for you.

Recast as process goals, the software goals might look like this:

  • Ensure that no new commit to our product introduces untested code, use property-based tests for all internal data structures, and incorporate fuzzing of every public interface into the QA process.
  • Profile the caching layer under a range of conditions and document the hot spots, identifying algorithmic and implementation-level avenues for performance improvement. Add performance testing to CI.
  • Participate in the project community, answering questions on the mailing list every week and submitting at least one nontrivial patch a week.

It’s easy to imagine the rest of your career and think about outcomes you want to see: earning a prestigious position, achieving acknowledged impact in your department or company, or writing a popular framework (or book).1 But those goals are often pretty far off from what you need to do to get there. Setting process goals supports the outcomes that you want to achieve. But the really remarkable thing is that they do so much more than that. By focusing on improving your preparation and execution, you’re making yourself a better engineer (or bike racer) even if you don’t get the outcomes you want right away — and making it more likely you’ll see great outcomes in the future.


  1. Outcome goals are a particularly dangerous trap if you’re coming from an environment like academia or open-source development, where outcome-related metrics like tenure, citation count, and number of GitHub “stars” are never far from your evaluation of your own career. (The frictionless recognitions afforded by social media “likes” and “favorites” bring this headache to a much broader audience.)

Elasticsearch has offered Hadoop InputFormat and OutputFormat implementations for quite some time. These made it possible to process Elasticsearch indices with Spark just as you would any other Hadoop data source. Here’s an example of this in action, taken from Elastic’s documentation:

Elasticsearch in Spark via Hadooplink
1
2
3
4
5
6
7
val conf = new JobConf()
conf.set("es.resource", "radio/artists")
conf.set("es.query", "?q=me*")
val esRDD = sc.hadoopRDD(conf,
                classOf[EsInputFormat[Text, MapWritable]],
                classOf[Text], classOf[MapWritable]))
val docCount = esRDD.count();

However, the latest versions of the Elasticsearch client offers more idiomatic Spark support to generate RDDs (and Spark data frames) directly from Elasticsearch indices without explicitly dealing with the Hadoop formatting machinery. This support is extremely cool, but the documentation is still a bit sparse and I hit a few snags using it. In this post, I’ll cover the problems I encountered and explain how to make it work.

Dependencies

Native Spark support is available in elasticsearch-hadoop version 2.1.0. If you’d tried to set this up last week, you’d have needed to run against a snapshot build to get support for Spark data frames. Fortunately, the most recent beta release of elasticsearch-hadoop includes this support. Add the following library dependency to your project:

sbt dependency configuration
1
libraryDependencies += "org.elasticsearch" %% "elasticsearch-spark" % "2.1.0.Beta4"

Since we won’t be using the lower-level Hadoop support, that’s the only library we’ll need.

Configuration

We need to supply some configuration in order to create RDDs directly from indices. elasticsearch-spark expects this configuration to be available as parameters on our Spark context object. Here’s an example of how you’d do that in a vanilla Spark application:

Configuring an ES node list
1
2
3
4
5
6
val conf = new org.apache.spark.SparkConf()
 .setMaster("local[*]")
 .setAppName("es-example")
 .set("es.nodes", "localhost")

val spark = new org.apache.spark.SparkContext(conf)

Note that since the default value for es.nodes is localhost, you don’t need to set it at all if you’re running against a local Elasticsearch instance. If you were running Elasticsearch behind a reverse proxy running on proxyhost on port 80, you might specify proxyhost:80 instead (and you would probably also want to set es.nodes.discovery to false so your client wouldn’t discover nodes that weren’t reachable outside of the private network). There are other options you can set, but this is the most critical.

If you’re using the Spark app skeleton provided by the Silex library, you can add Elasticsearch configuration in config hooks, as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import com.redhat.et.silex.app.AppCommon
import org.elasticsearch.spark._

object IndexCount extends AppCommon {
  addConfig { conf =>
    conf
     .set("es.nodes", sys.env.getOrElse("ES_NODES", "localhost"))
     .set("es.nodes.discovery", "false")
  }

  override def appName = "example ES app"

  def appMain(args: Array[String]) {
    args match {
      case Array(index, indexType) =>
        val resource = s"$index/$indexType"
        val count = context.esRDD(resource).count()
        Console.println(s"$resource has $count documents")
      case _ =>
        Console.println("usage: IndexCount index type")
    }
  }
}

Generating RDDs

elasticsearch-spark offers implicit conversions to make RDDs directly from Elasticsearch resources. (Careful readers may have already seen one in action!) The following code example assumes that you’ve initialized spark as a SparkContext with whatever Elasticsearch configuration data you need to set:

Generating RDDs from ES resources
1
2
3
4
5
6
7
8
9
10
// bring implicit conversions into scope to extend our
// SparkContext with the esRDD method, which creates a
// new RDD backed by an ES index or query
import org.elasticsearch.spark._

// telemetry-20080915/sar is an ES index of system telemetry data
// (collected from the sar tool)
val sarRDD = spark.esRDD("telemetry-20080915/sar")

// => sarRDD: org.apache.spark.rdd.RDD[(String, scala.collection.Map[String,AnyRef])] = ScalaEsRDD[4] at RDD at AbstractEsRDD.scala:17

This is far more convenient than using the Hadoop InputFormat, but the interface still leaves a little to be desired. Our data may have a sensible schema, but the result we’re getting back is as general as possible: just a collection of key-value pairs, where the keys are strings and the values are maps from strings to arbitrary (and arbitrarily-typed) values. If we want more explicit structure in our data, we’ll need to bring it in to Spark in a different way.

Generating data frames

In order to benefit from the structure in our data, we can generate data frames that store schema information. The most recent release of elasticsearch-spark supports doing this directly. Here’s what that looks like, again assuming that spark is an appropriately-configured SparkContext:

Generating data frames from ES resources
1
2
3
4
5
6
7
8
9
10
11
12
13
// bring implicit conversions into scope to extend our
// SQLContexts with the esDF method, which creates a
// new data frame backed by an ES index or query
import org.elasticsearch.spark.sql._

// construct a SQLContext
val sqlc = new org.apache.spark.sql.SQLContext(spark)

// telemetry-20080915/sar is an ES index of system telemetry data
// (collected from the sar tool)
val sarDF = sqlc.esDF("telemetry-20080915/sar")

// => org.apache.spark.sql.DataFrame = [_metadata: struct<file-date:timestamp,generated-by:string,generated-by-version:string,machine:string,nodename:string,number-of-cpus:int,release:string,sysdata-version:float,sysname:string>, cpu-load: string, cpu-load-all: string, disk: string, filesystems: string, hugepages: struct<hugfree:bigint,hugused:bigint,hugused-percent:double>, interrupts: string, interrupts-processor: string, io: struct<io-reads:struct<bread:double,rtps:double>,io-writes:struct<bwrtn:double,wtps:double>,tps:double>, kernel: struct<dentunusd:bigint,file-nr:bigint,inode-nr:bigint,pty-nr:bigint>, memory: struct<active:bigint,buffers:bigint,bufpg:double,cached:bigint,campg:double,commit:bigint,commit-percent:double,dirty:bigint,frmpg:double,inactive:bigint,memfree:bigint,memu...

Once we have a data frame for our ES resource, we can run queries against it in Spark:

1
2
3
sardf.registerTempTable("sardf")

sqlc.sql("select _metadata from sardf")

Next steps

Elasticsearch’s Spark support offers many additional capabilities, including storing Spark RDDs to Elasticsearch and generating RDDs from queries. Check out the upstream documentation for more!

Silex is a small library of helper code intended to make it easier to build real-world Spark applications;1 most of it is factored out from applications we’ve developed internally at Red Hat. We have a couple of long-term goals for the Silex project:

  • We want to make it very easy for us to spin up on new data-processing problems without spending a lot of time dealing with accidental complexity, and
  • we want to have a generally-useful standard library atop Spark that provides primitives and solutions to simplify common tasks and reflects best practices as we discover them.

Part of having a library that people will want to use is having a quality test suite and continuous integration. There are some tricks to developing automated tests for Spark applications, but even keeping those in mind may not be sufficient to let you test your Spark apps in a hosted CI environment. This post will show you how to bridge the gap.

Continuous integration challenges

Since we want to make it easy to review contributions to Silex, I set up Travis CI to watch branches and pull requests. Travis CI is a great service, but if you’re used to running tests locally, you might have some problems running Spark-based tests under hosted CI environments. Here’s what we learned:

  • When running tests locally, creating a Spark context with local[*] (in order to use all of the available cores on your machine) might be a good idea. However, a hosted CI environment may offer you a lot of cores but relatively little memory, so your tests might be killed because each Spark executor has nontrivial memory overhead. Instead, consider limiting your code to use fewer cores.
  • If you’re using Kryo serialization and have set the spark.kryoserializer.buffer.mb property to something large (perhaps because you often have serialization buffer crashes in production), don’t be surprised if you run out of memory while running in CI. Spark doesn’t share serializers between threads, so you could be allocating a huge buffer for each thread even if your test code doesn’t need to serialize anything all that large.2
  • Spark SQL (at least as of version 1.3.1) defaults to creating 200 partitions for shuffles. This is probably a good starting place for real-world data, but it’s overkill for test cases running on a single machine. Furthermore, since each partition has some extra memory overhead, it’s another possible cause of OOMEs.

Fixing the problems

Silex provides an application skeleton trait, and we use a class extending this trait to package up Spark and SQL contexts for test cases. That class isn’t currently part of the public Silex API, but you can see what it looks like here:

excerpt from app.scala
1
2
3
4
5
6
7
8
9
10
private [silex] class TestConsoleApp(val suppliedMaster: String = "local[2]") extends AppCommon {
  override def master = suppliedMaster
  override def appName = "console"

  addConfig( {(conf: SparkConf) => conf.set("spark.kryoserializer.buffer.mb", "2")})

  def appMain(args: Array[String]) {
    // this never runs
  }
}

As you can see, we allow users to specify a Spark master URL but default to using two cores locally. Furthermore, we use the addConfig function from AppCommon — which takes a SparkConf and returns a modified SparkConf — to ensure that our Kryo buffer size is the Spark default of 2 mb, rather than the larger Silex default.

If you’re used to writing test code that exercises Spark, you probably already have boilerplate (using something like ScalaTest’s BeforeAndAfterEach) to set up and tear down a Spark context for each test case. We set the Spark SQL property to control parallelism in data frame and SQL shuffles in the test setup code itself:

excerpt from app.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.scalatest._

import com.redhat.et.silex.app.TestConsoleApp

trait PerTestSparkContext extends BeforeAndAfterEach {
  self: BeforeAndAfterEach with Suite =>

  private var app: TestConsoleApp = null

  def context = app.context
  def sqlContext = app.sqlContext

  override def beforeEach() {
    app = new TestConsoleApp()
    System.clearProperty("spark.master.port")

    app.sqlContext.setConf("spark.sql.shuffle.partitions", "10")

    app.context
  }

  override def afterEach() {
    app.context.stop
  }
}

Conclusion

Good CI is one of those things that makes it fun to develop software, but the accidental differences between a hosted environment and your local environment can be totally frustrating. By taking into account the constraints you’re likely to see when running in a resource-limited container on a CI server, you can help to ensure that your tests only fail when your code is actually broken.


  1. “Silex” is Latin for “flint,” which seemed like a good name for something that was intended to help people make things with sparks.

  2. Thanks to Erik Erlandson for noticing this (it even affected him in a test case he was running locally).

Natural join is a useful special case of the relational join operation (and is extremely common when denormalizing data pulled in from a relational database). Spark’s DataFrame API provides an expressive way to specify arbitrary joins, but it would be nice to have some machinery to make the simple case of natural join as easy as possible. Here’s what a natural join needs to do:

  1. For relations R and S, identify the columns they have in common, say c1 and c2;
  2. join R and S on the condition that R.c1 == S.c1 and R.c2 == S.c2; and
  3. project away the duplicated columns.

so, in Spark, a natural join would look like this:

natJoinExample.scala
1
2
3
4
5
6
7
8
/* r and s are DataFrames, declared elsewhere */

val joined = r.join(s, r("c1") == s("c1") && r("c2") == s("c2"))
val common = Set("c1", "c2")
val outputColumns = Seq(r("c1"), r("c2")) ++
                    r.columns.collect { case c if !common.contains(c) => r(c) } ++
                    s.columns.collect { case c if !common.contains(c) => s(c) }
val projected = joined.select(outputColumns : _*)

We can generalize this as follows (note that joining two frames with no columns in common will produce an empty frame):

natjoin.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.apache.spark.sql.DataFrame
import scala.language.implicitConversions

trait NaturalJoining {
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.types._

  /**
   * Performs a natural join of two data frames.
   *
   * The frames are joined by equality on all of the columns they have in common.
   * The resulting frame has the common columns (in the order they appeared in <code>left</code>), 
   * followed by the columns that only exist in <code>left</code>, followed by the columns that 
   * only exist in <code>right</code>.
   */
  def natjoin(left: DataFrame, right: DataFrame): DataFrame = {
    val leftCols = left.columns
    val rightCols = right.columns

    val commonCols = leftCols.toSet intersect rightCols.toSet

    if(commonCols.isEmpty)
      left.limit(0).join(right.limit(0))
    else
      left
        .join(right, commonCols.map {col => left(col) === right(col) }.reduce(_ && _))
        .select(leftCols.collect { case c if commonCols.contains(c) => left(c) } ++
                leftCols.collect { case c if !commonCols.contains(c) => left(c) } ++
                rightCols.collect { case c if !commonCols.contains(c) => right(c) } : _*)
  }
}

Furthermore, we can make this operation available to any DataFrame via implicit conversions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
case class DFWithNatJoin(df: DataFrame) extends NaturalJoining {
  def natjoin(other: DataFrame): DataFrame = super.natjoin(df, other)
}

/** 
 * Module for natural join functionality.  Import <code>NaturalJoin._</code> for static access 
 * to the <code>natjoin</code> method, or import <code>NaturalJoin.implicits._</code> to pimp 
 * Spark DataFrames with a <code>natjoin</code> member method. 
 */
object NaturalJoin extends NaturalJoining {
  object implicits {
    implicit def dfWithNatJoin(df: DataFrame) = DFWithNatJoin(df)
  }
}

If you’re interested in using this code in your own projects, simply add the Silex library to your project and import com.redhat.et.silex.frame._. (You can also get Silex via bintray.)

One of the great things about Apache Spark is that you can experiment with new analyses interactively. In the past, I’ve used the sbt console to try out new data transformations and models; the console is especially convenient since you can set it up as a custom Scala REPL with your libraries loaded and some test fixtures already created.

However, some of Spark’s coolest new functionality depends on some aspects of Scala reflection that aren’t compatible with how sbt uses classloaders for tasks, so you’re liable to see MissingRequirementError exceptions when you try and run code that exercises parts of Spark SQL or the DataFrame API from the sbt console.1

You can certainly run a regular Scala REPL or the spark-shell, but doing so sacrifices a lot of the flexibility of running from sbt: every time your code or dependencies change, you’ll need to package your application and set your classpath, ensuring that all of your application classes and dependencies are available to the REPL application.

Fortunately, there’s an easier way: you can make a small application that runs a Scala REPL set up the way you like and ask sbt how to set its classpath. First, write up a simple custom Scala REPL, like this one:

ReplApp.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
object ReplApp {
  import scala.tools.nsc.interpreter._
  import scala.tools.nsc.Settings

  def main(args: Array[String]) {
    val repl = new ILoop {
      override def loop(): Unit = {
        // ConsoleApp is just a simple container for a Spark context
        // and configuration
        val app = new com.redhat.et.silex.app.ConsoleApp()
        intp.addImports("org.apache.spark.SparkConf")
        intp.addImports("org.apache.spark.SparkContext")
        intp.addImports("org.apache.spark.SparkContext._")
        intp.addImports("org.apache.spark.rdd.RDD")

        intp.bind("app", app)
        intp.bind("spark", app.context)
        intp.bind("sqlc", app.sqlContext)
        intp.addImports("sqlc._")

        super.loop()
      }
    }

    val settings = new Settings
    settings.Yreplsync.value = true

    settings.usejavacp.value = true

    repl.process(settings)
  }
}

The ReplApp application sets up a Scala REPL with imports for some common Spark classes and bindings to a SparkContext and SqlContext. (The ConsoleApp object is just a simple wrapper for Spark context and configuration; see the Silex project, where my team is collecting and generalizing infrastructure code from Spark applications, for more details — or just change this code to set up a SparkContext as you see fit.)

In order to run this application, you’ll need to set its classpath, and sbt gives you a way to do find out exactly what environment it would be using so you can run the application manually.2 First, make sure you have a copy of sbt-extras either in your repository or somewhere pointed to by SBT in your environment. Then, create a shell script that looks like this:

repl.sh
1
2
3
4
5
6
7
8
9
10
11
#!/bin/sh

# set SBT to the location of a current sbt-extras script,
# or bundle one in your repository
export SBT=${SBT:-./sbt}
export SCALA_VERSION=$(${SBT} "export scalaVersion" | tail -1)
export APP_CP=$(${SBT} -batch -q "export compile:dependencyClasspath" | tail -1)
export JLINE_CP=$(find $HOME/.ivy2 | grep org.scala-lang/jline | grep ${SCALA_VERSION}.jar$ | tail -1)

${SBT} package && java -cp ${APP_CP}:${JLINE_CP} ReplApp
stty sane

You can then run repl.sh and get a Scala REPL that has all of your app’s dependencies and classes loaded and will let you experiment with structured data manipulation in Spark.


  1. Frustratingly, apps that use these features will work, since the classes Scala reflection depends on will be loaded by the bootstrap classloader, and test cases will work as long as you have sbt fork a new JVM to execute them! Unfortunately, sbt cannot currently fork a new JVM to run a console.

  2. The run-main task is the right way to run most applications from sbt, but it seems to be somewhat flaky when launching interactive console applications.

Last night I had a crazy realization: I could probably replace the majority of what my team hopes to accomplish with standup meetings, design documents, project management apps, and social code sharing features with a single tool.

Before I reveal what this tool is, let’s consider why most team collaboration is suboptimal. Everyone who has been in standup meetings or read groupwide status reports has probably seen the following status-update antipatterns; in fact, unless you’re a better teammate than I, you’ve probably been guilty of each of these yourself:

  • “This week I read 253 emails in my inbox, 1478 emails from mailing list folders, and sampled 75 messages from my spam folder to make sure none of them were spuriously filed. I also wrote a net 498 lines of code, brewed eight pots of coffee and five shots of espresso, spent 246 minutes in meetings, and urinated twenty-one times.”
  • “I’m still working on the same thing I was working on last week. None of you are close enough to this problem to know why it is so tough and I’m too defeated to explain it to you, so feel free to assume that I am either heroically deep in the weeds or have gotten totally ratholed on something irrelevant.”
  • “I discovered that this graph search problem could be better modeled as a language-recognition query, and spent some time investigating efficient representations before coming up with a really clever approach involving boolean satisfiability — there was a paper in the SIGYAWN ‘75 proceedings that used a similar technique; maybe you read it too — … [fifteen minutes pass] … and improved performance by 0.57%.”

It doesn’t seem like it should be, but getting the level of detail right in status updates is hard, and it’s hard for stupid reasons. Your big accomplishments for the week are easy to explain and you don’t want to make it seem like you did nothing, so you start accounting for every second since last Monday. It’s too depressing to consider that the bug that’s been keeping you awake for the last few weeks still isn’t solved and you don’t want to spend time talking about all of the dim trails you investigated, so you’ll Eeyore it up to get out of the meeting and back to work. You’re so deep in a problem that you think everyone else needs to know about a bunch of tiny details and can’t tell that they’ve all fallen asleep.

To some extent, all of these seemingly-different problems stem from too much status and not enough context. But even if we recast status reports as context updates, we’ve traded the problem of finding an appropriate level of status detail for finding an appropriate amount of context. How do we decide what an appropriate amount of context is?

Think of the best and worst talks you’ve ever seen. A good talk doesn’t exhaustively describe the work the author did, but it places it in enough context so that the audience has some way to evaluate the claims, get interested, and decide if the work is worth investigating further. A bad talk is vague, disorganized, gets bogged down in irrelevant details, or only makes sense to people who already know everything about its subject.

Now think about the best talks you’ve ever given. Giving a good talk — even more so than writing a good article or essay — hones and clarifies your argument. You have to step out of your bubble and see your work from the perspective of your audience, because you have just a few minutes to catch their attention before they tune out and start checking their email while waiting for the next speaker. Now, I’ve given a lot of lectures, academic talks, and conference presentations, but I’ve found that giving informal, internal tech talks on some interesting aspect of something I’ve been working on has paid off disproportionately.

So I don’t want my team to give each other status updates. Instead, I want us to give regular short talks. And the medium that I want us to use as we develop these talks is a shared slide deck, updated throughout the week.1 In spite of the amazingly persistent ubiquity of speakers reading through their wall-of-text decks like apparatchiks in dystopian fiction, slides aren’t a substitute for talks. However, a good slide deck supports a good talk, and designing a slide deck helps to make sure that you have the right amount of context in your talk, because the wrong amount of context in a talk leaves some obvious odors in the deck:

  • Oh, you need to drop down to 18 point type to cover everything you wanted here? That’s probably a clue that you’re covering something that would be better addressed in further discussion or a subsequent deep dive.
  • Does this slide feel totally bare? Maybe you need more background here.
  • Could you draw a picture and replace this paragraph? Could you write a sentence and replace this code excerpt?
  • If you had to explain this to a customer or an executive — really, to anyone who’s sharp and engaged but not a domain expert — right now, where would you start? How would you keep them interested?

My team is doing data science, so a lot of what we wind up delivering turns out to be presentations and visual explanations. As a result, this practice could be even more beneficial for us than it might be for other groups. Rigorously working out explanations among ourselves is certain to pay off in the future, when we have to present results internally or externally. By using a shared deck, we can annotate with questions, link to more detail elsewhere, and rapidly iterate on our presentations as we have improved results or come up with clearer explanations.

(Thanks to RJ Nowling, who suggested the idea of a regular “analytics review,” like a code review, and got me thinking along these lines.)


  1. I take talks and slides very seriously and was initially resistant to using web-based slideware: most slideware is optimized for making ugly, bullet-laden decks and even the best web-based office suites — although they are amazing technical achievements — are still sort of clunky to use compared to desktop applications. But the flexibility of having a collaboratively-edited deck is huge.

Over eight years ago, Richard WM Jones wrote a great but disheartening article about his experience serving as a technical reviewer for an infamous book about OCaml. The post made quite an impression on me at the time and I’ve often recalled it over the years whenever opportunities to do prepress reviews have landed in my inbox. Briefly, Jones was asked to use terrible tools (Microsoft Word) to deal with stilted, error-ridden prose surrounding unidiomatic and broken code. For his trouble, he got an hourly wage that would have represented a slight raise over what I made mowing my neighbors’ lawns in high school.

Recently, I was invited to review a book under even more outrageous terms. The project in question was the second edition of a book on a subject I understand pretty well.1 I didn’t respond to their first invitation to review because:

  • I assumed they’d cast a fairly wide net;
  • my spare time is pretty well saturated these days and reviewing technical prose is a lot of slow, precise work;
  • in my opinion, this publisher releases a lot of subpar products and I don’t want my name and reputation associated with a mediocre book;
  • the earlier edition of this book had received extremely poor reviews; and
  • the compensation offered was simply risible: credit in the frontmatter of the finished book, a paper or ebook copy of the finished book, and an additional ebook of my choice.

Now, there are always tradeoffs involved in choosing contract work: one can weigh compensation against loving one’s work, having pride in products, having a prestigious position, or enjoying a low-stress work environment. But I’m pretty sure I know what the skill set that would enable someone to be a competent reviewer of a book like this one is worth, and the people who have it aren’t likely to be motivated to volunteer for a for-profit corporation in exchange for credit and free ebooks. Personally, a demand on my spare time that depends on my professional expertise (and takes away from time I might spend with my family, bicycling, catching up on sleep, or learning something new) but offers no compensation is not particularly compelling, and it becomes far less compelling when it seems like an opportunity to be involved with a product that I’m not likely to believe in and that I probably wouldn’t be proud to put my name on. So when the publisher sent me a rather snippy message following up, I wrote back, gently indicating that I would not be able to do the review and that they’d likely have trouble finding a qualified reviewer who was willing to work for free.

One wonders what publishers expect to accomplish with prepress technical reviews. In Jones’s case, he provided a lot of feedback and suggested that Apress not proceed with publishing Practical OCaml without major revisions; they ignored his objections but used his name in the final text. (At least he was paid something, I guess.) Not every publisher works this way, but the ones that do seem more interested in shifting the blame for bogus content to an external contractor than in releasing a thoroughly vetted product.

Requests to do specialized work for free are almost always insulting. However, the apparent value that some publishers ascribe to prepress review is even more insulting if you consider it from the perspective of the technical-book consumer!


  1. In fact, this publisher had invited me to write a book for them on this topic earlier this year, apparently before deciding to revise their existing offering instead.