Elasticsearch has offered Hadoop InputFormat and OutputFormat implementations for quite some time. These made it possible to process Elasticsearch indices with Spark just as you would any other Hadoop data source. Here’s an example of this in action, taken from Elastic’s documentation:

Elasticsearch in Spark via Hadooplink
1
2
3
4
5
6
7
val conf = new JobConf()
conf.set("es.resource", "radio/artists")
conf.set("es.query", "?q=me*")
val esRDD = sc.hadoopRDD(conf,
                classOf[EsInputFormat[Text, MapWritable]],
                classOf[Text], classOf[MapWritable]))
val docCount = esRDD.count();

However, the latest versions of the Elasticsearch client offers more idiomatic Spark support to generate RDDs (and Spark data frames) directly from Elasticsearch indices without explicitly dealing with the Hadoop formatting machinery. This support is extremely cool, but the documentation is still a bit sparse and I hit a few snags using it. In this post, I’ll cover the problems I encountered and explain how to make it work.

Dependencies

Native Spark support is available in elasticsearch-hadoop version 2.1.0. If you’d tried to set this up last week, you’d have needed to run against a snapshot build to get support for Spark data frames. Fortunately, the most recent beta release of elasticsearch-hadoop includes this support. Add the following library dependency to your project:

sbt dependency configuration
1
libraryDependencies += "org.elasticsearch" %% "elasticsearch-spark" % "2.1.0.Beta4"

Since we won’t be using the lower-level Hadoop support, that’s the only library we’ll need.

Configuration

We need to supply some configuration in order to create RDDs directly from indices. elasticsearch-spark expects this configuration to be available as parameters on our Spark context object. Here’s an example of how you’d do that in a vanilla Spark application:

Configuring an ES node list
1
2
3
4
5
6
val conf = new org.apache.spark.SparkConf()
 .setMaster("local[*]")
 .setAppName("es-example")
 .set("es.nodes", "localhost")

val spark = new org.apache.spark.SparkContext(conf)

Note that since the default value for es.nodes is localhost, you don’t need to set it at all if you’re running against a local Elasticsearch instance. If you were running Elasticsearch behind a reverse proxy running on proxyhost on port 80, you might specify proxyhost:80 instead (and you would probably also want to set es.nodes.discovery to false so your client wouldn’t discover nodes that weren’t reachable outside of the private network). There are other options you can set, but this is the most critical.

If you’re using the Spark app skeleton provided by the Silex library, you can add Elasticsearch configuration in config hooks, as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import com.redhat.et.silex.app.AppCommon
import org.elasticsearch.spark._

object IndexCount extends AppCommon {
  addConfig { conf =>
    conf
     .set("es.nodes", sys.env.getOrElse("ES_NODES", "localhost"))
     .set("es.nodes.discovery", "false")
  }

  override def appName = "example ES app"

  def appMain(args: Array[String]) {
    args match {
      case Array(index, indexType) =>
        val resource = s"$index/$indexType"
        val count = context.esRDD(resource).count()
        Console.println(s"$resource has $count documents")
      case _ =>
        Console.println("usage: IndexCount index type")
    }
  }
}

Generating RDDs

elasticsearch-spark offers implicit conversions to make RDDs directly from Elasticsearch resources. (Careful readers may have already seen one in action!) The following code example assumes that you’ve initialized spark as a SparkContext with whatever Elasticsearch configuration data you need to set:

Generating RDDs from ES resources
1
2
3
4
5
6
7
8
9
10
// bring implicit conversions into scope to extend our
// SparkContext with the esRDD method, which creates a
// new RDD backed by an ES index or query
import org.elasticsearch.spark._

// telemetry-20080915/sar is an ES index of system telemetry data
// (collected from the sar tool)
val sarRDD = spark.esRDD("telemetry-20080915/sar")

// => sarRDD: org.apache.spark.rdd.RDD[(String, scala.collection.Map[String,AnyRef])] = ScalaEsRDD[4] at RDD at AbstractEsRDD.scala:17

This is far more convenient than using the Hadoop InputFormat, but the interface still leaves a little to be desired. Our data may have a sensible schema, but the result we’re getting back is as general as possible: just a collection of key-value pairs, where the keys are strings and the values are maps from strings to arbitrary (and arbitrarily-typed) values. If we want more explicit structure in our data, we’ll need to bring it in to Spark in a different way.

Generating data frames

In order to benefit from the structure in our data, we can generate data frames that store schema information. The most recent release of elasticsearch-spark supports doing this directly. Here’s what that looks like, again assuming that spark is an appropriately-configured SparkContext:

Generating data frames from ES resources
1
2
3
4
5
6
7
8
9
10
11
12
13
// bring implicit conversions into scope to extend our
// SQLContexts with the esDF method, which creates a
// new data frame backed by an ES index or query
import org.elasticsearch.spark.sql._

// construct a SQLContext
val sqlc = new org.apache.spark.sql.SQLContext(spark)

// telemetry-20080915/sar is an ES index of system telemetry data
// (collected from the sar tool)
val sarDF = sqlc.esDF("telemetry-20080915/sar")

// => org.apache.spark.sql.DataFrame = [_metadata: struct<file-date:timestamp,generated-by:string,generated-by-version:string,machine:string,nodename:string,number-of-cpus:int,release:string,sysdata-version:float,sysname:string>, cpu-load: string, cpu-load-all: string, disk: string, filesystems: string, hugepages: struct<hugfree:bigint,hugused:bigint,hugused-percent:double>, interrupts: string, interrupts-processor: string, io: struct<io-reads:struct<bread:double,rtps:double>,io-writes:struct<bwrtn:double,wtps:double>,tps:double>, kernel: struct<dentunusd:bigint,file-nr:bigint,inode-nr:bigint,pty-nr:bigint>, memory: struct<active:bigint,buffers:bigint,bufpg:double,cached:bigint,campg:double,commit:bigint,commit-percent:double,dirty:bigint,frmpg:double,inactive:bigint,memfree:bigint,memu...

Once we have a data frame for our ES resource, we can run queries against it in Spark:

1
2
3
sardf.registerTempTable("sardf")

sqlc.sql("select _metadata from sardf")

Next steps

Elasticsearch’s Spark support offers many additional capabilities, including storing Spark RDDs to Elasticsearch and generating RDDs from queries. Check out the upstream documentation for more!

Silex is a small library of helper code intended to make it easier to build real-world Spark applications;1 most of it is factored out from applications we’ve developed internally at Red Hat. We have a couple of long-term goals for the Silex project:

  • We want to make it very easy for us to spin up on new data-processing problems without spending a lot of time dealing with accidental complexity, and
  • we want to have a generally-useful standard library atop Spark that provides primitives and solutions to simplify common tasks and reflects best practices as we discover them.

Part of having a library that people will want to use is having a quality test suite and continuous integration. There are some tricks to developing automated tests for Spark applications, but even keeping those in mind may not be sufficient to let you test your Spark apps in a hosted CI environment. This post will show you how to bridge the gap.

Continuous integration challenges

Since we want to make it easy to review contributions to Silex, I set up Travis CI to watch branches and pull requests. Travis CI is a great service, but if you’re used to running tests locally, you might have some problems running Spark-based tests under hosted CI environments. Here’s what we learned:

  • When running tests locally, creating a Spark context with local[*] (in order to use all of the available cores on your machine) might be a good idea. However, a hosted CI environment may offer you a lot of cores but relatively little memory, so your tests might be killed because each Spark executor has nontrivial memory overhead. Instead, consider limiting your code to use fewer cores.
  • If you’re using Kryo serialization and have set the spark.kryoserializer.buffer.mb property to something large (perhaps because you often have serialization buffer crashes in production), don’t be surprised if you run out of memory while running in CI. Spark doesn’t share serializers between threads, so you could be allocating a huge buffer for each thread even if your test code doesn’t need to serialize anything all that large.2
  • Spark SQL (at least as of version 1.3.1) defaults to creating 200 partitions for shuffles. This is probably a good starting place for real-world data, but it’s overkill for test cases running on a single machine. Furthermore, since each partition has some extra memory overhead, it’s another possible cause of OOMEs.

Fixing the problems

Silex provides an application skeleton trait, and we use a class extending this trait to package up Spark and SQL contexts for test cases. That class isn’t currently part of the public Silex API, but you can see what it looks like here:

excerpt from app.scala
1
2
3
4
5
6
7
8
9
10
private [silex] class TestConsoleApp(val suppliedMaster: String = "local[2]") extends AppCommon {
  override def master = suppliedMaster
  override def appName = "console"

  addConfig( {(conf: SparkConf) => conf.set("spark.kryoserializer.buffer.mb", "2")})

  def appMain(args: Array[String]) {
    // this never runs
  }
}

As you can see, we allow users to specify a Spark master URL but default to using two cores locally. Furthermore, we use the addConfig function from AppCommon — which takes a SparkConf and returns a modified SparkConf — to ensure that our Kryo buffer size is the Spark default of 2 mb, rather than the larger Silex default.

If you’re used to writing test code that exercises Spark, you probably already have boilerplate (using something like ScalaTest’s BeforeAndAfterEach) to set up and tear down a Spark context for each test case. We set the Spark SQL property to control parallelism in data frame and SQL shuffles in the test setup code itself:

excerpt from app.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.scalatest._

import com.redhat.et.silex.app.TestConsoleApp

trait PerTestSparkContext extends BeforeAndAfterEach {
  self: BeforeAndAfterEach with Suite =>

  private var app: TestConsoleApp = null

  def context = app.context
  def sqlContext = app.sqlContext

  override def beforeEach() {
    app = new TestConsoleApp()
    System.clearProperty("spark.master.port")

    app.sqlContext.setConf("spark.sql.shuffle.partitions", "10")

    app.context
  }

  override def afterEach() {
    app.context.stop
  }
}

Conclusion

Good CI is one of those things that makes it fun to develop software, but the accidental differences between a hosted environment and your local environment can be totally frustrating. By taking into account the constraints you’re likely to see when running in a resource-limited container on a CI server, you can help to ensure that your tests only fail when your code is actually broken.


  1. “Silex” is Latin for “flint,” which seemed like a good name for something that was intended to help people make things with sparks.

  2. Thanks to Erik Erlandson for noticing this (it even affected him in a test case he was running locally).

Natural join is a useful special case of the relational join operation (and is extremely common when denormalizing data pulled in from a relational database). Spark’s DataFrame API provides an expressive way to specify arbitrary joins, but it would be nice to have some machinery to make the simple case of natural join as easy as possible. Here’s what a natural join needs to do:

  1. For relations R and S, identify the columns they have in common, say c1 and c2;
  2. join R and S on the condition that R.c1 == S.c1 and R.c2 == S.c2; and
  3. project away the duplicated columns.

so, in Spark, a natural join would look like this:

natJoinExample.scala
1
2
3
4
5
6
7
8
/* r and s are DataFrames, declared elsewhere */

val joined = r.join(s, r("c1") == s("c1") && r("c2") == s("c2"))
val common = Set("c1", "c2")
val outputColumns = Seq(r("c1"), r("c2")) ++
                    r.columns.collect { case c if !common.contains(c) => r(c) } ++
                    s.columns.collect { case c if !common.contains(c) => s(c) }
val projected = joined.select(outputColumns : _*)

We can generalize this as follows (note that joining two frames with no columns in common will produce an empty frame):

natjoin.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.apache.spark.sql.DataFrame
import scala.language.implicitConversions

trait NaturalJoining {
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.types._

  /**
   * Performs a natural join of two data frames.
   *
   * The frames are joined by equality on all of the columns they have in common.
   * The resulting frame has the common columns (in the order they appeared in <code>left</code>), 
   * followed by the columns that only exist in <code>left</code>, followed by the columns that 
   * only exist in <code>right</code>.
   */
  def natjoin(left: DataFrame, right: DataFrame): DataFrame = {
    val leftCols = left.columns
    val rightCols = right.columns

    val commonCols = leftCols.toSet intersect rightCols.toSet

    if(commonCols.isEmpty)
      left.limit(0).join(right.limit(0))
    else
      left
        .join(right, commonCols.map {col => left(col) === right(col) }.reduce(_ && _))
        .select(leftCols.collect { case c if commonCols.contains(c) => left(c) } ++
                leftCols.collect { case c if !commonCols.contains(c) => left(c) } ++
                rightCols.collect { case c if !commonCols.contains(c) => right(c) } : _*)
  }
}

Furthermore, we can make this operation available to any DataFrame via implicit conversions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
case class DFWithNatJoin(df: DataFrame) extends NaturalJoining {
  def natjoin(other: DataFrame): DataFrame = super.natjoin(df, other)
}

/** 
 * Module for natural join functionality.  Import <code>NaturalJoin._</code> for static access 
 * to the <code>natjoin</code> method, or import <code>NaturalJoin.implicits._</code> to pimp 
 * Spark DataFrames with a <code>natjoin</code> member method. 
 */
object NaturalJoin extends NaturalJoining {
  object implicits {
    implicit def dfWithNatJoin(df: DataFrame) = DFWithNatJoin(df)
  }
}

If you’re interested in using this code in your own projects, simply add the Silex library to your project and import com.redhat.et.silex.frame._. (You can also get Silex via bintray.)

One of the great things about Apache Spark is that you can experiment with new analyses interactively. In the past, I’ve used the sbt console to try out new data transformations and models; the console is especially convenient since you can set it up as a custom Scala REPL with your libraries loaded and some test fixtures already created.

However, some of Spark’s coolest new functionality depends on some aspects of Scala reflection that aren’t compatible with how sbt uses classloaders for tasks, so you’re liable to see MissingRequirementError exceptions when you try and run code that exercises parts of Spark SQL or the DataFrame API from the sbt console.1

You can certainly run a regular Scala REPL or the spark-shell, but doing so sacrifices a lot of the flexibility of running from sbt: every time your code or dependencies change, you’ll need to package your application and set your classpath, ensuring that all of your application classes and dependencies are available to the REPL application.

Fortunately, there’s an easier way: you can make a small application that runs a Scala REPL set up the way you like and ask sbt how to set its classpath. First, write up a simple custom Scala REPL, like this one:

ReplApp.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
object ReplApp {
  import scala.tools.nsc.interpreter._
  import scala.tools.nsc.Settings

  def main(args: Array[String]) {
    val repl = new ILoop {
      override def loop(): Unit = {
        // ConsoleApp is just a simple container for a Spark context
        // and configuration
        val app = new com.redhat.et.silex.app.ConsoleApp()
        intp.addImports("org.apache.spark.SparkConf")
        intp.addImports("org.apache.spark.SparkContext")
        intp.addImports("org.apache.spark.SparkContext._")
        intp.addImports("org.apache.spark.rdd.RDD")

        intp.bind("app", app)
        intp.bind("spark", app.context)
        intp.bind("sqlc", app.sqlContext)
        intp.addImports("sqlc._")

        super.loop()
      }
    }

    val settings = new Settings
    settings.Yreplsync.value = true

    settings.usejavacp.value = true

    repl.process(settings)
  }
}

The ReplApp application sets up a Scala REPL with imports for some common Spark classes and bindings to a SparkContext and SqlContext. (The ConsoleApp object is just a simple wrapper for Spark context and configuration; see the Silex project, where my team is collecting and generalizing infrastructure code from Spark applications, for more details — or just change this code to set up a SparkContext as you see fit.)

In order to run this application, you’ll need to set its classpath, and sbt gives you a way to do find out exactly what environment it would be using so you can run the application manually.2 First, make sure you have a copy of sbt-extras either in your repository or somewhere pointed to by SBT in your environment. Then, create a shell script that looks like this:

repl.sh
1
2
3
4
5
6
7
8
9
10
11
#!/bin/sh

# set SBT to the location of a current sbt-extras script,
# or bundle one in your repository
export SBT=${SBT:-./sbt}
export SCALA_VERSION=$(${SBT} "export scalaVersion" | tail -1)
export APP_CP=$(${SBT} -batch -q "export compile:dependencyClasspath" | tail -1)
export JLINE_CP=$(find $HOME/.ivy2 | grep org.scala-lang/jline | grep ${SCALA_VERSION}.jar$ | tail -1)

${SBT} package && java -cp ${APP_CP}:${JLINE_CP} ReplApp
stty sane

You can then run repl.sh and get a Scala REPL that has all of your app’s dependencies and classes loaded and will let you experiment with structured data manipulation in Spark.


  1. Frustratingly, apps that use these features will work, since the classes Scala reflection depends on will be loaded by the bootstrap classloader, and test cases will work as long as you have sbt fork a new JVM to execute them! Unfortunately, sbt cannot currently fork a new JVM to run a console.

  2. The run-main task is the right way to run most applications from sbt, but it seems to be somewhat flaky when launching interactive console applications.

Last night I had a crazy realization: I could probably replace the majority of what my team hopes to accomplish with standup meetings, design documents, project management apps, and social code sharing features with a single tool.

Before I reveal what this tool is, let’s consider why most team collaboration is suboptimal. Everyone who has been in standup meetings or read groupwide status reports has probably seen the following status-update antipatterns; in fact, unless you’re a better teammate than I, you’ve probably been guilty of each of these yourself:

  • “This week I read 253 emails in my inbox, 1478 emails from mailing list folders, and sampled 75 messages from my spam folder to make sure none of them were spuriously filed. I also wrote a net 498 lines of code, brewed eight pots of coffee and five shots of espresso, spent 246 minutes in meetings, and urinated twenty-one times.”
  • “I’m still working on the same thing I was working on last week. None of you are close enough to this problem to know why it is so tough and I’m too defeated to explain it to you, so feel free to assume that I am either heroically deep in the weeds or have gotten totally ratholed on something irrelevant.”
  • “I discovered that this graph search problem could be better modeled as a language-recognition query, and spent some time investigating efficient representations before coming up with a really clever approach involving boolean satisfiability — there was a paper in the SIGYAWN ‘75 proceedings that used a similar technique; maybe you read it too — … [fifteen minutes pass] … and improved performance by 0.57%.”

It doesn’t seem like it should be, but getting the level of detail right in status updates is hard, and it’s hard for stupid reasons. Your big accomplishments for the week are easy to explain and you don’t want to make it seem like you did nothing, so you start accounting for every second since last Monday. It’s too depressing to consider that the bug that’s been keeping you awake for the last few weeks still isn’t solved and you don’t want to spend time talking about all of the dim trails you investigated, so you’ll Eeyore it up to get out of the meeting and back to work. You’re so deep in a problem that you think everyone else needs to know about a bunch of tiny details and can’t tell that they’ve all fallen asleep.

To some extent, all of these seemingly-different problems stem from too much status and not enough context. But even if we recast status reports as context updates, we’ve traded the problem of finding an appropriate level of status detail for finding an appropriate amount of context. How do we decide what an appropriate amount of context is?

Think of the best and worst talks you’ve ever seen. A good talk doesn’t exhaustively describe the work the author did, but it places it in enough context so that the audience has some way to evaluate the claims, get interested, and decide if the work is worth investigating further. A bad talk is vague, disorganized, gets bogged down in irrelevant details, or only makes sense to people who already know everything about its subject.

Now think about the best talks you’ve ever given. Giving a good talk — even more so than writing a good article or essay — hones and clarifies your argument. You have to step out of your bubble and see your work from the perspective of your audience, because you have just a few minutes to catch their attention before they tune out and start checking their email while waiting for the next speaker. Now, I’ve given a lot of lectures, academic talks, and conference presentations, but I’ve found that giving informal, internal tech talks on some interesting aspect of something I’ve been working on has paid off disproportionately.

So I don’t want my team to give each other status updates. Instead, I want us to give regular short talks. And the medium that I want us to use as we develop these talks is a shared slide deck, updated throughout the week.1 In spite of the amazingly persistent ubiquity of speakers reading through their wall-of-text decks like apparatchiks in dystopian fiction, slides aren’t a substitute for talks. However, a good slide deck supports a good talk, and designing a slide deck helps to make sure that you have the right amount of context in your talk, because the wrong amount of context in a talk leaves some obvious odors in the deck:

  • Oh, you need to drop down to 18 point type to cover everything you wanted here? That’s probably a clue that you’re covering something that would be better addressed in further discussion or a subsequent deep dive.
  • Does this slide feel totally bare? Maybe you need more background here.
  • Could you draw a picture and replace this paragraph? Could you write a sentence and replace this code excerpt?
  • If you had to explain this to a customer or a executive — really, to anyone who’s sharp and engaged but not a domain expert — right now, where would you start? How would you keep them interested?

My team is doing data science, so a lot of what we wind up delivering turns out to be presentations and visual explanations. As a result, this practice could be even more beneficial for us than it might be for other groups. Rigorously working out explanations among ourselves is certain to pay off in the future, when we have to present results internally or externally. By using a shared deck, we can annotate with questions, link to more detail elsewhere, and rapidly iterate on our presentations as we have improved results or come up with clearer explanations.

(Thanks to RJ Nowling, who suggested the idea of a regular “analytics review” and got me thinking along these lines.)


  1. I take talks and slides very seriously and was initially resistant to using web-based slideware: most slideware is optimized for making ugly, bullet-laden decks and even the best web-based office suites — although they are amazing technical achievements — are still sort of clunky to use compared to desktop applications. But the flexibility of having a collaboratively-edited deck is huge.

Over eight years ago, Richard WM Jones wrote a great but disheartening article about his experience serving as a technical reviewer for an infamous book about OCaml. The post made quite an impression on me at the time and I’ve often recalled it over the years whenever opportunities to do prepress reviews have landed in my inbox. Briefly, Jones was asked to use terrible tools (Microsoft Word) to deal with stilted, error-ridden prose surrounding unidiomatic and broken code. For his trouble, he got an hourly wage that would have represented a slight raise over what I made mowing my neighbors’ lawns in high school.

Recently, I was invited to review a book under even more outrageous terms. The project in question was the second edition of a book on a subject I understand pretty well.1 I didn’t respond to their first invitation to review because:

  • I assumed they’d cast a fairly wide net;
  • my spare time is pretty well saturated these days and reviewing technical prose is a lot of slow, precise work;
  • in my opinion, this publisher releases a lot of subpar products and I don’t want my name and reputation associated with a mediocre book;
  • the earlier edition of this book had received extremely poor reviews; and
  • the compensation offered was simply risible: credit in the frontmatter of the finished book, a paper or ebook copy of the finished book, and an additional ebook of my choice.

Now, there are always tradeoffs involved in choosing contract work: one can weigh compensation against loving one’s work, having pride in products, having a prestigious position, or enjoying a low-stress work environment. But I’m pretty sure I know what the skill set that would enable someone to be a competent reviewer of a book like this one is worth, and the people who have it aren’t likely to be motivated to volunteer for a for-profit corporation in exchange for credit and free ebooks. Personally, a demand on my spare time that depends on my professional expertise (and takes away from time I might spend with my family, bicycling, catching up on sleep, or learning something new) but offers no compensation is not particularly compelling, and it becomes far less compelling when it seems like an opportunity to be involved with a product that I’m not likely to believe in and that I probably wouldn’t be proud to put my name on. So when the publisher sent me a rather snippy message following up, I wrote back, gently indicating that I would not be able to do the review and that they’d likely have trouble finding a qualified reviewer who was willing to work for free.

One wonders what publishers expect to accomplish with prepress technical reviews. In Jones’s case, he provided a lot of feedback and suggested that Apress not proceed with publishing Practical OCaml without major revisions; they ignored his objections but used his name in the final text. (At least he was paid something, I guess.) Not every publisher works this way, but the ones that do seem more interested in shifting the blame for bogus content to an external contractor than in releasing a thoroughly vetted product.

Requests to do specialized work for free are almost always insulting. However, the apparent value that some publishers ascribe to prepress review is even more insulting if you consider it from the perspective of the technical-book consumer!


  1. In fact, this publisher had invited me to write a book for them on this topic earlier this year, apparently before deciding to revise their existing offering instead.

I’ll be speaking later this afternoon at ApacheCon EU. The title of my talk is “Iteratively Improving Spark Application Performance.” The great thing about Apache Spark is that simple prototype applications are very easy to develop, and even a first attempt at realizing a new analysis will usually work well enough so that it’s not frustrating to evaluate it on real data. However, simple prototypes can often exhibit performance problems that aren’t obvious until you know where to look.

In this talk, we’ll introduce Spark’s execution model and discuss how to use Spark effectively. I’ll use a prototype implementation of my bike data analytics application as a running example and will present four general principles to keep in mind when writing efficient Spark applications, complete with detailed code explanations. My slides are available here as a PDF.

If you’re interested in more detail about the bike data analytics application, you can watch a brief video demo or watch a video of my talk about this application from Spark Summit earlier this year. Finally, I’ve published a blog post covering similar principles for improving Spark applications, which will be a useful reference whether or not you’re able to attend the talk.1

If you’re also at ApacheCon today, I hope to see you at 15:50 CET!


  1. The talk will feature visual explanations and other material that are not in that post, but the post has links to full code examples suitable for off-line experimentation.

My last post covered some considerations for using Spark SQL on a real-world JSON dataset. In particular, schema inference can suffer when you’re ingesting a dataset of heterogeneous objects. In this post, I’d like to sketch out some ways to connect schema inference to type inference, in order to point to automated solutions to some of the problems we’ve seen.

Background

We’ll start out by reviewing some definitions from the world of type systems:

  • a type is a range of values.
  • a product type is a compound type that describes structures with multiple values of (possibly) different types (e.g., tuples or records). You can think of a product type as being like the Cartesian product of several other types: if we have types A and B, the type A × B (i.e., the type of pairs of one value in A and one value in B) can represent the Cartesian product of A and B.
  • a sum type is a compound type that represents the disjoint union of several other types. Given types A and B, the type A + B represents the sum of A and B (i.e., any value in A or any value in B).

In Scala terms, a simple example of a product type is a tuple:

tuple.scala
1
2
// StringIntProduct is a product type:  String x Int
type StringIntProduct = Pair[String, Int]

and a simple example of a sum type is a basic linked list:

intConsCell.scala
1
2
3
4
// IntList is a sum type:  IntCons + EmptyList
abstract class IntList {}
case class IntCons(car: Int, cdr: IntNode) extends IntList {}
case object EmptyList extends IntList {}

Note that, in the case of the linked list (as with other standard recursive data structures), we’re actually looking at a sum type in which one of the summed types is a product type: an IntList can either be an EmptyList or a cons cell, consisting of a pair of an Int value (the head of the list) and an IntList (the tail of the list).

Also note that, in Scala, sum types are tagged. You can statically distinguish between cons cells and the empty list in a case match.1 C programmers are familiar with a related kind of sum type: the untagged union, which describes a structure that can hold one value that can be of several different types. If you have a C union, you’ll need to explicitly track what kind of value it’s holding since you won’t be able to tell what kind of value it contains statically.

Schema inference

Given a collection of JSON objects,2 a schema inference algorithm produces a type that is the supertype of every object in the collection. (This is an interesting problem only if the collection contains objects of different types.) As an example, consider the following two objects:

bicycle-and-car.js
1
2
{ "gears" : 20, "pedals" : "spd-sl", "handlebars" : "drop" }
{ "gears" : 5, "abs" : true, "fuel" : "unleaded" }

The first models some characteristics of a bicycle and the second models some characteristics of a car. Both have gears, but bicycles have data about what kind of pedals and handlebars they include; cars have data about whether or not they include anti-lock brakes (abs) and specify what kind of fuel they expect. If we ask Spark SQL to infer a schema for a collection including these two objects, we’ll get something like this:

root
 |-- abs: boolean (nullable = true)
 |-- fuel: string (nullable = true)
 |-- gears: integer (nullable = true)
 |-- handlebars: string (nullable = true)
 |-- pedals: string (nullable = true)

Given a collection of objects with types T₁, T₂, ⋯, Tₓ, what we’re trying to get is the sum type T₁ + T₂ + ⋯ + Tₓ . The way we get this type is as an untagged union: we get an object that has the union of all of the fields of T₁, all of the fields of T₂, and so on. (Put another way, we’re really treating a heterogeneous collection as a union of homogeneous collections and identifying its schema as the union of the schemas for each included homogeneous collection.) Just as if we were using a C union, we’d have to figure out whether or not each object was a bicycle or a car. Fortunately, since we’re be in a safe-language context, we’d know that an object with a value in the pedals field wouldn’t have a value in the abs field!

It should be clear that this is a totally reasonable approach, since it combines a straightforward implementation with a pleasant and unsurprising user experience: by adding a simple filter to my queries, I can select only objects of a given type from a heterogeneous collection. For example, if I want to find the mean number of gears for every bicycle with drop handlebars, I could write the following query (assuming I’d registered my SchemaRDD as a table called vehicles):

1
SELECT AVG(gears) FROM vehicles WHERE handlebars = 'drop'

Since handlebars will be NULL for every car, this query will only consider bicycles. Other collections may not admit such straightforward differentiation between object kinds, of course.

Problems and possible solutions

Tagging diverging types

As I mentioned last week, things begin to break down when we attempt to construct the sum of types that have the same field names but diverging types for those fields, as in the following example:

1
2
{ "branches": ["foo"] }
{ "branches": [{ "foo":42 }] }

A few days ago, Spark would have failed to infer a schema for this collection of objects, but open-source development moves quickly and now it will handle the conflicting types by giving users a schema in which the branches field contains an array of strings (and in which every object in the collection has its branches field cast to an array of strings).

In general, it seems like whatever technique we might adopt schema inference should prefer imprecise results to a crash. But are there ways Spark could do a better job of inferring schemas for heterogeneous collections with diverging field types? I believe there might be, and part of the answer is going from untagged unions to tagged unions.

The fedmsg data set is a heterogeneous collection of objects; in some of these objects, the branches field models one concept as an array of strings, and in others, it models a different concept as an array of objects. I solved this problem by preprocessing the JSON to rename the different branches fields — in effect, dividing the branches fields into two tagged possibilities, like this. We could imagine that Spark could handle schema divergence by automatically identifying equivalence classes of field types, perhaps naming them with an integer subscript (e.g., branches_1 and branches_2) or with some abbreviated identifier for the value type, as follows:

1
2
{ "branches_as": ["foo"] }
{ "branches_ao": [{ "foo":42 }] }

Here’s an alternative way we could make the tagging explicit, by wrapping the diverging field’s values in an object:

1
2
{ "branches": { "_as" : ["foo"] } }
{ "branches": { "_ao" : [{ "foo":42 }] } }

The advantage to this tagged-union approach is that it makes it possible to write queries that handle heterogeneous collections. The disadvantage is that the path from the schema root to the tagged value will be different from the path to that value in the schemas of any of the objects in the original heterogeneous collection. The schema change may be surprising to the user, but — by definition — any approach to derive a fixed schema for a collection of objects with diverging schemas will not result in a schema that is totally compatible with the objects in the original collection.

Identifying maps

The other problem I observed related to encoding maps as JSON objects, as in the following example where we have a map from bicycle names to vehicle objects:

1
2
3
4
5
6
{
  "cx" : { "gears" : 20, "pedals" : "spd", "handlebars" : "drop" },
  "ss 29er" : { "gears" : 1, "pedals" : "spd", "handlebars" : "flat" },
  "del norte" : { "gears" : 1, "pedals" : "spd", "handlebars" : "drop" },
  "allez" : { "gears" : 20, "pedals" : "spd-sl", "handlebars" : "drop" }
}

The inferred schema for a collection including an object like this will have a distinct field for every key in every map. In this case, we don’t have an immediately great solution from the broader world of type systems. However, it seems that some heuristic — for example, “if an object has more than n fields and each has a value of the same type, it is a map” — might be able to do a good job of identifying objects that model maps.

In any case, it is possible for the user to preprocess their objects in order to encode maps in a different way if that will make it easier to query the data after ingest.

Final thoughts

Spark SQL’s support for schema inference is an extremely cool feature, although it obviously isn’t able to work magic by ascribing rigid structure to heterogeneous collections of ad hoc objects. Some minor improvements could make inferred schemas more useful for these kinds of collections, though.


  1. Scala doesn’t support untagged union types directly — although there are clever ways to encode them in the type system — and Scala won’t actually be able to infer sum types from collections of untagged values: the inferred type of Array("foo", 1) is Array[Any], not Array[String + Int].

  2. (Slightly) more precisely: objects are defined recursively as unordered collections of key-value pairs, where keys are strings and values can be numbers, booleans, strings, objects, or arrays of values.

In this post, I’ll briefly introduce fedmsg, the federated message bus developed as part of the Fedora project’s infrastructure, and discuss how to ingest fedmsg data for processing with Spark SQL. While I hope you’ll find the analytic possibilities of fedmsg data as interesting as I do, this post will also cover some possible pitfalls you might run into while ingesting complex JSON data or the contents of large SQL databases into Spark SQL more generally.

Background

fedmsg is one of the coolest things about Fedora’s infrastructure.1 Because nearly everything that happens in Fedora is logged to a messaging bus, there is a vast amount of public data about the daily operation of a large and thriving open-source community, all in one place.2 The whole list of message topics published on the Fedora project’s fedmsg bus are documented here, but these examples should give you a taste of the kinds of messages available for inspection:

  • when a Fedora contributor updates the package build metadata (including RPM specfile, patches, source tarball list. etc.) in git,3
  • when a package build is started (and then again when the package build succeeds or fails, and when the package build is pushed out to the distribution),
  • when a Fedora community member tags a package with keywords,
  • when a blog post is syndicated to Fedora Planet,
  • when a wiki page is created or updated,
  • when an IRC meeting starts or ends,

and so on. Furthermore, these data are accessible either as a stream of messages (as they are generated), as JSON-encoded historical data via a REST interface, or as a PostgreSQL dump of all historical fedmsg activity.

An exhaustive treatment of fedmsg is outside the scope of this post,4 but I wanted to make it clear that

  • open infrastructure for a very large open-source project is an extremely cool thing,
  • there is a lot of interesting data available in fedmsg,
  • there have been some cool applications built atop fedmsg already (see Fedora Badges for an Xbox-like “achievements” system or Ralph Bean’s animated visualization for a couple of fun examples), and
  • there are a lot of possibilities for making sense of this data using Spark, Spark SQL, and Spark Streaming.

Getting the historical data

I wanted to look at a large amount of data to start with, so I began by grabbing a dump of the datanommer database. (Datanommer is a service that records each message on the fedmsg bus in a SQL database.) The nightly dump I got was about 3 gb compressed and expanded to about 40 gb — certainly not “big data,” but too big to fit in main memory on my workstation. I loaded the dump into a new Postgres database and started inspecting it.

The largest table by far is messages,5 which has the following schema:

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE messages (
    id integer NOT NULL,
    i integer NOT NULL,
    "timestamp" timestamp without time zone NOT NULL,
    certificate text,
    signature text,
    topic text,
    _msg text NOT NULL,
    category text,
    source_name text,
    source_version text,
    msg_id text
);

There are a couple of things to note here:

  • the certificate and signature fields account for the bulk of the raw data, and
  • the _msg field actually contains the string representation of a topic-specific JSON object.

As we’ll see, each of these can lead to some pitfalls when attempting to ingest the data into Spark.

Pitfall 1: Single-node memory limits and Slick projections

Here’s the core of a very simple program that ingests fedmsg records from a local PostgreSQL database, using Slick, and outputs these to JSON files,6 one for each record,7 in a local directory (I’m not reproducing the generated code that models datanommer tables and rows here):

Importer.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.freevariable.copricapo

import scala.slick.direct._
import scala.slick.direct.AnnotationMapper._
import scala.slick.driver.PostgresDriver
import scala.slick.driver.PostgresDriver.simple._
import scala.slick.jdbc.StaticQuery.interpolation

import scala.reflect.runtime.universe

import com.freevariable.copricapo.schema._

object Importer {
  import org.json4s._
  import org.json4s.ext._
  import org.json4s.JsonDSL._
  import org.joda.time.DateTime
  import org.json4s.jackson.JsonMethods.{parse => parseJson, render => renderJson, compact}

  object JSONImplicits {
    private def tupleIfPresent[T](name: String, v: Option[T])
      (implicit cnv: T => JValue): JObject =
      v.map(some => (name -> some) ~ JObject()) getOrElse JObject()

    implicit def message2json(m: Tables.MessagesRow) = {
      ("id" -> m.id) ~
      ("i" -> m.i) ~
      ("timestamp" -> (new DateTime(m.timestamp.getTime()).toString)) ~
      tupleIfPresent("topic", m.topic) ~
      tupleIfPresent("category", m.category) ~
      ("msg" -> parseJson(m._Msg)) ~
      tupleIfPresent("msg_id", m.msgId) ~
      tupleIfPresent("source_name", m.sourceName) ~
      tupleIfPresent("source_version", m.sourceVersion)
    }
  }

  def apply(dburl: String, output_dir: String, limit: Int = 0) {
    import JSONImplicits._

    new java.io.File(output_dir).mkdir()

    Database.forURL(dburl, driver="org.postgresql.Driver") withSession { implicit session =>
      val messages = TableQuery[Tables.Messages]

      (if (limit == 0) messages else messages.take(limit)) foreach { m =>
        val id = m.id
        val outputFile = s"$output_dir/$id.json"
        println(s"rendering to $outputFile")
        val pw = new java.io.PrintWriter(new java.io.File(outputFile))
        pw.println(compact(renderJson(m)))
        pw.close
      }
    }
  }
}

Note that the message2json implicit conversion doesn’t do anything with the certificate and signature fields.
These fields are necessary for critical production applications because anyone can write to the fedmsg bus. However, I’m not planning to make any irreversible decisions based on my analyses of fedmsg data and am thus not currently interested in actually verifying message contents or provenance. Since I’m also not currently interested in wasting memory or performance, I’m going to strip these out of the generated JSON.

Unfortunately, even shipping these fields from the database to the JVM only to ignore them dramatically impacts the performance and memory footprint of our importer, so we’ll want to project these away sooner. The best solution I found for efficiently projecting away nullable fields from a SQL database accessed via Slick involved modifying the generated code to control how case classes were instantiated from the database. The generated code includes definitions of case classes for rows in each table, like this:

1
2
3
4
  case class MessagesRow(id: Int, i: Int, timestamp: java.sql.Timestamp,
    certificate: Option[String], signature: Option[String],
    topic: Option[String], _Msg: String, category: Option[String],
    sourceName: Option[String], sourceVersion: Option[String], msgId: Option[String])

The generated code also contains classes describing each table; these include a * method defining the default projection from a tuple to a row-specific case class. In our case, the default projection for the messages table looks like this:

1
2
3
4
5
6
  class Messages(tag: Tag) extends Table[MessagesRow](tag, "messages") {
    def * = (id, i, timestamp, certificate, signature, topic, _Msg, category, sourceName, sourceVersion, msgId) <>
      (MessagesRow.tupled, MessagesRow.unapply)

    // (remainder of class elided)
  }

If we modify it, we can ignore certificate and signature (essentially returning SQL NULL values instead of enormous encoded objects for these fields):

1
2
def * = (id, i, timestamp, None, None, topic, _Msg, category, sourceName, sourceVersion, msgId) <>
  (MessagesRow.tupled, MessagesRow.unapply)

Now we can fire up a REPL and produce JSON documents from the datanommer data; the following example will take the first 100 rows PostgreSQL gives us and generate documents for them in /tmp/scratch:

1
com.freevariable.copricapo.Importer("jdbc:postgresql:datanommer", "/tmp/scratch", 100)

Pitfall 2: The Lost Caverns (of type inference)

Spark SQL supports Hive-style records and arrays within relational data. An especially cool feature of Spark SQL, though, is that it can infer a HiveQL schema from a collection of JSON documents with complex nested structure (see the jsonFile method of SQLContext, which will operate on a single JSON document or a directory of JSON documents).

If you have a clone of the Spark repository on your machine, the easiest way to try this out is to launch sbt hive/console from within your local clone. You’ll then be able to build a schema from some JSON documents:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// sparkContext is predefined in the Spark Hive console
val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)

// /tmp/scratch contains JSON documents
val docs = sqlContext.jsonFile("/tmp/scratch")

// Inspect the schema
docs.printSchema

// Register it as a SQL table
docs.registerAsTable("documents")

// Get the first hundred rows as a Scala array
sqlContext.sql("select * from documents limit 100").collect

// Store a copy as a Parquet file
docs.saveAsParquetFile("/tmp/scratch.parquet")

JSON is convenient for users because it lets us specify untyped objects with no fixed schema. Unfortunately, JSON lets us specify untyped objects with no fixed schema. This means that Spark might have trouble inferring a fixed schema for our data. Consider how type inference works in real programming languages: if a variable can take one of two possible values, its type must be the unification on their types. Sometimes, this means we get a trivial typing (in Scala terms, Any), but in other cases, it means we get a contradiction (in Scala terms, Nothing). When Spark is inferring types for our documents, it will simply raise an exception if it tries to unify two objects with diverging types, as you can see in the following example:

1
2
3
val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
val diverging = sparkContext.parallelize(List("""{"branches": ["foo"]}""", """{"branches": [{"foo":42}]}"""))
sqlContext.jsonRDD(diverging)  // throws a MatchError

The two JSON objects each contain a field called branches, but in the former it is an array of strings, while in the latter it is an object. In the case of fedmsg data, this problem came up because different message topics used branches to correspond to different (but related) concepts. If we could rename these different cases, we could avoid the diverging types. Since the JSON library we’re using has a facility to transform JSON values based on partial functions, we can do this easily:

1
2
3
4
5
6
7
8
9
10
11
12
  def renameBranches(msg: JValue) = {
    msg transformField {
      case JField("branches", v @ JArray(JString(_)::_)) => ("pkg_branches", v)
      case JField("branches", v @ JArray(JObject(_)::_)) => ("git_branches", v)
    }
  }

  def transformObject(record: JValue) = {
    record transformField {
      case JField("_msg", msg) => ("_msg", renameBranches(msg))
    }
  }

The renameBranches method above will simply translate fields named branches with array-of-strings values to fields named pkg_branches with the same values and translate fields named branches with array-of-objects values to fields named git_branches with the same values. Other fields are ignored by this code.

Pitfall 3: Too many columns

A related problem comes up when we have JSON objects who use field names as entity names, like in the following example (taken from a message describing a git commit):

1
2
3
4
5
6
7
8
9
10
{
  "stats" : {
    "files" : {
      ".gitignore" : { "deletions" : 0, "insertions" : 1, "lines" : 1 },
      "cmake-fedora.spec" : { "deletions" : 3, "insertions" : 3, "lines" : 6 },
      "sources" : { "deletions" : 1, "insertions" : 1, "lines" : 2 }
    },
    "total" : { "deletions" : 4, "files" : 3, "insertions" : 5, "lines" : 9 }
  }
}

Note that files is essentially a collection of objects describing different changed files in the commit (.gitignore, cmake-fedora.spec, and sources), but it is represented as an object with a field for each changed file. Because of this, the inferred schema will have a field in the stats.files object for every single filename that has ever been committed to Fedora’s git repository! (That’s a lot of NULL values in a typical row.)

There are other similar cases in the fedmsg data as well where entity names become field names; these can all be handled by transformation rules in fairly straightforward fashion.

What’s next

This post should have enough background to help you get started looking at fedmsg data with Spark SQL. In future posts, I’ll discuss some additional correctness and efficiency considerations; demonstrate how to unify warehoused, batched, and streaming data; and introduce some analytic applications involving fedmsg data.


  1. I’m most familiar with Fedora, but the Debian community has also begun to use fedmsg to publish events about their infrastructure.

  2. I’ve recently been reading Jay Kreps’ I ♥ Logs, in which Kreps argues (from principle and from experience) for the centrality of the log abstraction in distributed systems. I may write a longer review later, but the argument is sound (and consonant with some of the arguments initially made for basing Fedora’s infrastructure around a messaging bus) and the book is definitely worth checking out!

  3. See the distgit source for the fedmsg package for an example.

  4. For more information, read the docs or check out the slides from Ralph Bean’s client-oriented or overview presentations.

  5. There are also tables to store the names of users, the names of packages, and to model the relationships between users or packages and fedmsg messages involving those users or packages.

  6. I’m using JSON here because of the rich structure of the topic-specific messages. If you were ingesting flat relational data, it would almost certainly make more sense to use JdbcRDD and do the ingest within Spark.

  7. Please don’t use the above code in production; it’s intended to be simple, not efficient. Producing one file per record makes it easy to see where things are going wrong, but it’s very slow both for this program and for the Spark jobs that you’ll want to write to read these data. Your local filesystem probably won’t be too happy with you, either.

I went to Strata + Hadoop World last week. This event targets a pretty broad audience and is an interesting mix of trade show, data science conference, and software conference. However, I’ve been impressed by the quality and relevance of the technical program both of the years that I’ve gone. The key to finding good talks at this kind of event is to target talks focusing on applications, visualization, and fundamental techniques, rather than ostensibly technical talks.1 In the rest of this post, I’ll share some of my notes from the talks and tutorials I enjoyed the most.

D3.js

The first event I attended was Sebastian Gutierrez’s D3 tutorial. (Gutierrez’s slides and sample code are available online.) A great deal of this session was focused on introducing JavaScript and manipulating the DOM with D3, which was great for me; I’ve used D3 before but my knowledge of web frontend development practice is spotty and ad-hoc at best.

This was a great tutorial: the pace, scope, content, and presentation were all really excellent. It provided enough context to make sense of D3’s substantial capabilities and left me feeling like I could dive in to using D3 for some interesting projects. Finally, Gutierrez provided lots of links to other resources to learn more and experiment, both things I’d seen, like bl.ocks.org, and things that were new to me, like tributary.io and JSFiddle.

The standalone D3 tutorial on DashingD3JS.com seems like an excellent way to play along at home.

Engineering Pipelines for Learning at Scale

I attended the “Hardcore Data Science” track in the afternoon of the tutorial day; there were several interesting talks but I wanted to call out in particular Ben Recht’s talk, which mentioned BDAS in the title but was really about generic problems in developing large-scale systems that use machine learning.

Recht’s talk began with the well-known but oft-ignored maxim that machine learning is easy once you’ve turned your analysis problem into an optimization problem. I say “oft-ignored” here not because I believe practitioners are getting hung up on the easy part of the problem, but because it seems like raw modeling performance is often the focus of marketing for open-source and commercial projects alike.2 The interesting engineering challenges are typically in the processing pipeline from raw data to an optimizable model: acquiring data, normalizing and pruning values, and selecting features all must take place before modeling and classification. Learning problems often have this basic structure; Recht provided examples of these pipeline stages in object recognition and text processing domains.

The main motivating example was Recht’s attempt to replicate the Oregon State Digital Scout project, which analyzed video footage of American football, and answered questions including what plays were likely from given offensive formations. So Recht’s team set out to stich together video footage into a panorama, translate from video coordinates to inferred field coordinates, and track player positions and movements. This preprocessing code, which they had implemented with standard C++ and the OpenCV library, took ten hours to analyze video of a four-second NFL play. Expert programmers could surely have improved the runtime of this code substantially, but it seems like we should be able to support software engineering of machine-learning applications without having teams of experts working on each stage of the learning pipeline.

The talk concluded by presenting some ideas for work that would support software engineering for learning pipelines. These were more speculative but generally focused on using programming language technology to make it easier to develop, compose, and reason about learning pipelines.

Keynotes

Since the talks from the plenary sessions are all fairly brief (and freely available as videos), I’ll simply call out a couple that were especially enjoyable.

Domain-Specific Languages for Data Transformation

Joe Hellerstein and Sean Kandel of Trifacta gave a talk on domain-specific languages for data transformation in general and on their Wrangle system in particular. (Trifacta is also responsible for the extremely cool Vega project.) The talk led with three rules for successful data wrangling, which I’ll paraphrase here:

  • your processes, not your data, are most important;
  • reusable scripts to process data are more valuable than postprocessed and cleaned data; and
  • “agile” models in which preprocessing is refined iteratively in conjunction with downstream analysis of the processed data are preferable to having preprocessing take place as an isolated phase.3

The first act of the talk also referenced this quotation from Alfred North Whitehead, which set the tone for a survey of DSLs for data transformation and query construction:

By relieving the brain of all unnecessary work, a good notation sets it free to concentrate on more advanced problems…

In the 1990s, several data processing DSLs emerged that were based on second-order logic, proximity joins, and composable transformations.4 Hellerstein argued that, since these underlying formalisms weren’t any simpler to understand than the relational model, the DSLs weren’t particularly successful at simplifying data transformation for end-users.

Raman and Hellerstein’s Potter’s Wheel system presented a visual interface to SQL for data cleaning; it could automatically detect bad data and infer structure domains but wasn’t particularly easier to use than SQL: putting a visual, menu-driven interface over a language doesn’t generally lower the cognitive load of using that language.5

The Wrangler system (which forms the basis for the Wrangle DSL in Trifacta’s product) attacks the problem from a different angle: data cleaning operations are suggested based on explicit user guidance, semantic information about types and domains, and inferences of user intent based on prior actions. The entire problem of suggesting queries and transformations is thus expressible as an optimization problem over the space of DSL clauses. With the interface to the Wrangle DSL, the user highlights features in individual rows of the data to appear in the transformed data or in a visualization, and the system presents several suggested queries that it had inferred. The Wrangle system also infers regular expressions for highlighted text transformations, but since regular expressions — and especially machine-generated ones — are often painful to read, it translates these to a more readable (but less expressive) representation before presenting them to the user.

Kandel gave a very impressive live demo of Wrangle on a couple of data sets, including techniques for sensibly treating dirty or apparently-nonsensical rows (in this example, the confounding data included negative contribution amounts from election finance data, which represented refunds or revised accounting). This is excellent work at the intersection of databases, programming languages, machine learning, and human-computer interaction — which is a space that I suspect a lot of future contributions to data processing will occupy.

Clustering for Anomaly Detection

Sean Owen’s talk on using Spark and k-means clustering for anomaly detection was absolutely packed. Whether this was more due to the overwhelming popularity of talks with “Spark” in the title or because Sean is widely known as a sharp guy and great presenter, I’m not sure, but it was an excellent talk and was about more than just introducing clustering in Spark. It was really a general discussion of how to go from raw data to a clustering problem in a way that would give you the most useful results — Spark was essential to make the presentation of ETL and clustering code simple enough to fit on a slide, but the techniques were generally applicable.

The running example in this talk involved system log data from a supervised learning competition. Some of the log data was generated by normal activity and some of it was generated by activity associated with various security exploits. The original data were labeled (whether as “normal” or by the name of the associated exploit), since they were intended to evaluate supervised learning techniques. However, Owen pointed out that we could identify anomalies by clustering points and looking at ones that were far away from any cluster center. Since k-means clustering is unsupervised, one of Owen’s first steps was to remove the labels.

With code to generate an RDD of unlabeled records, Owen then walked through the steps he might use if he were using clustering to characterize these data:

  • The first question involves choosing a suitable k. Since Owen knew that there were 23 different kinds of records (those generated by normal traffic as well as by 22 attack types), it seems likely that there would be at least 23 clusters. A naïve approach to choosing a cluster count (viz., choosing a number that minimizes the distance from each point to its nearest centroid) falls short in a couple of ways. First, since cluster centers are originally picked randomly, the results of finding k clusters may not be deterministic. (This is easy to solve by ensuring that the algorithm runs for a large number of iterations and has a small distance threshold for convergence.) More importantly, though, finding n clusters, where n is the size of the population, would be optimal under this metric, but it would tell us nothing.
  • The vectors in the raw data describe 42 features but the distance between them is dominated by two values: bytes read and bytes written, which are relatively large integer values (as opposed to many of the features, which are booleans encoded as 0 or 1). Normalizing each feature by its z-score made each feature contribute equally to distance.
  • Another useful trick is encoding features whose space is a finite domain of n elements as n boolean features (where only one of them will be true). So if your feature could be either apple, banana, cantaloupe, or durian in the raw data, you could encode it as four boolean-valued features, one of which is true if and only if the feature is apple, one of which is true if and only if the feature is banana, and so on.
  • Using entropy (or normalized mutual information) to evaluate the clustering can be a better bet since fitness by entropy won’t increase until k = n as fitness by Euclidean distance will. Another option is restoring the labels from the original data and verifying that clusters typically have homogeneous labels.

Summary

Many of the talks I found interesting featured at least some of the following common themes: using programming-language technology, improving programmer productivity, or focusing on the pipeline from raw data to clean feature vectors. I especially appreciated Sean Owen’s talk for showing how to refine clustering over real data and demonstrating substantial improvements by a series of simple changes. As Recht reminds us, the optimization problems are easy to solve; it’s great to see more explicit focus on making the process that turns analysis problems into optimization problems as easy as possible.


  1. Alas, talks with technical-sounding abstracts often wind up having a nontrivial marketing component!

  2. This is related to the reason why “data lakes” have turned out to be not quite as useful as we might have hoped: a place to store vast amounts of sparse, noisy, and irregular data and an efficient way to train models from these data are necessary but not sufficient conditions for actually answering questions!

  3. Hellerstein pointed out that the alternative to his recommendation is really the waterfall model (seriously, click that link), which makes it all the more incredible that anyone follows it for data processing. Who would willingly admit to using a practice initially defined as a strawman?

  4. SchemaSQL and AJAX were provided as examples.

  5. I was first confronted with the false promise of visual languages in the 1980s. I had saved my meager kid earnings for weeks and gone to HT Electronics in Sunnyvale to purchase the AiRT system for the Amiga. AiRT was a visual programming language that turned out to be neither more expressive nor easier to use than any of the text-based languages I was already familiar with. With the exception of this transcript of a scathing review in Amazing Computing magazine (“…most of [its] potential is not realized in the current implementation of AiRT, and I cannot recommend it for any serious work.”), it is now apparently lost to history.