I’ll be speaking later this afternoon at ApacheCon EU. The title of my talk is “Iteratively Improving Spark Application Performance.” The great thing about Apache Spark is that simple prototype applications are very easy to develop, and even a first attempt at realizing a new analysis will usually work well enough so that it’s not frustrating to evaluate it on real data. However, simple prototypes can often exhibit performance problems that aren’t obvious until you know where to look.

In this talk, we’ll introduce Spark’s execution model and discuss how to use Spark effectively. I’ll use a prototype implementation of my bike data analytics application as a running example and will present four general principles to keep in mind when writing efficient Spark applications, complete with detailed code explanations. My slides are available here as a PDF.

If you’re interested in more detail about the bike data analytics application, you can watch a brief video demo or watch a video of my talk about this application from Spark Summit earlier this year. Finally, I’ve published a blog post covering similar principles for improving Spark applications, which will be a useful reference whether or not you’re able to attend the talk.1

If you’re also at ApacheCon today, I hope to see you at 15:50 CET!


  1. The talk will feature visual explanations and other material that are not in that post, but the post has links to full code examples suitable for off-line experimentation.

My last post covered some considerations for using Spark SQL on a real-world JSON dataset. In particular, schema inference can suffer when you’re ingesting a dataset of heterogeneous objects. In this post, I’d like to sketch out some ways to connect schema inference to type inference, in order to point to automated solutions to some of the problems we’ve seen.

Background

We’ll start out by reviewing some definitions from the world of type systems:

  • a type is a range of values.
  • a product type is a compound type that describes structures with multiple values of (possibly) different types (e.g., tuples or records). You can think of a product type as being like the Cartesian product of several other types: if we have types A and B, the type A × B (i.e., the type of pairs of one value in A and one value in B) can represent the Cartesian product of A and B.
  • a sum type is a compound type that represents the disjoint union of several other types. Given types A and B, the type A + B represents the sum of A and B (i.e., any value in A or any value in B).

In Scala terms, a simple example of a product type is a tuple:

tuple.scala
1
2
// StringIntProduct is a product type:  String x Int
type StringIntProduct = Pair[String, Int]

and a simple example of a sum type is a basic linked list:

intConsCell.scala
1
2
3
4
// IntList is a sum type:  IntCons + EmptyList
abstract class IntList {}
case class IntCons(car: Int, cdr: IntNode) extends IntList {}
case object EmptyList extends IntList {}

Note that, in the case of the linked list (as with other standard recursive data structures), we’re actually looking at a sum type in which one of the summed types is a product type: an IntList can either be an EmptyList or a cons cell, consisting of a pair of an Int value (the head of the list) and an IntList (the tail of the list).

Also note that, in Scala, sum types are tagged. You can statically distinguish between cons cells and the empty list in a case match.1 C programmers are familiar with a related kind of sum type: the untagged union, which describes a structure that can hold one value that can be of several different types. If you have a C union, you’ll need to explicitly track what kind of value it’s holding since you won’t be able to tell what kind of value it contains statically.

Schema inference

Given a collection of JSON objects,2 a schema inference algorithm produces a type that is the supertype of every object in the collection. (This is an interesting problem only if the collection contains objects of different types.) As an example, consider the following two objects:

bicycle-and-car.js
1
2
{ "gears" : 20, "pedals" : "spd-sl", "handlebars" : "drop" }
{ "gears" : 5, "abs" : true, "fuel" : "unleaded" }

The first models some characteristics of a bicycle and the second models some characteristics of a car. Both have gears, but bicycles have data about what kind of pedals and handlebars they include; cars have data about whether or not they include anti-lock brakes (abs) and specify what kind of fuel they expect. If we ask Spark SQL to infer a schema for a collection including these two objects, we’ll get something like this:

root
 |-- abs: boolean (nullable = true)
 |-- fuel: string (nullable = true)
 |-- gears: integer (nullable = true)
 |-- handlebars: string (nullable = true)
 |-- pedals: string (nullable = true)

Given a collection of objects with types T₁, T₂, ⋯, Tₓ, what we’re trying to get is the sum type T₁ + T₂ + ⋯ + Tₓ . The way we get this type is as an untagged union: we get an object that has the union of all of the fields of T₁, all of the fields of T₂, and so on. (Put another way, we’re really treating a heterogeneous collection as a union of homogeneous collections and identifying its schema as the union of the schemas for each included homogeneous collection.) Just as if we were using a C union, we’d have to figure out whether or not each object was a bicycle or a car. Fortunately, since we’re be in a safe-language context, we’d know that an object with a value in the pedals field wouldn’t have a value in the abs field!

It should be clear that this is a totally reasonable approach, since it combines a straightforward implementation with a pleasant and unsurprising user experience: by adding a simple filter to my queries, I can select only objects of a given type from a heterogeneous collection. For example, if I want to find the mean number of gears for every bicycle with drop handlebars, I could write the following query (assuming I’d registered my SchemaRDD as a table called vehicles):

1
SELECT AVG(gears) FROM vehicles WHERE handlebars = 'drop'

Since handlebars will be NULL for every car, this query will only consider bicycles. Other collections may not admit such straightforward differentiation between object kinds, of course.

Problems and possible solutions

Tagging diverging types

As I mentioned last week, things begin to break down when we attempt to construct the sum of types that have the same field names but diverging types for those fields, as in the following example:

1
2
{ "branches": ["foo"] }
{ "branches": [{ "foo":42 }] }

A few days ago, Spark would have failed to infer a schema for this collection of objects, but open-source development moves quickly and now it will handle the conflicting types by giving users a schema in which the branches field contains an array of strings (and in which every object in the collection has its branches field cast to an array of strings).

In general, it seems like whatever technique we might adopt schema inference should prefer imprecise results to a crash. But are there ways Spark could do a better job of inferring schemas for heterogeneous collections with diverging field types? I believe there might be, and part of the answer is going from untagged unions to tagged unions.

The fedmsg data set is a heterogeneous collection of objects; in some of these objects, the branches field models one concept as an array of strings, and in others, it models a different concept as an array of objects. I solved this problem by preprocessing the JSON to rename the different branches fields — in effect, dividing the branches fields into two tagged possibilities, like this. We could imagine that Spark could handle schema divergence by automatically identifying equivalence classes of field types, perhaps naming them with an integer subscript (e.g., branches_1 and branches_2) or with some abbreviated identifier for the value type, as follows:

1
2
{ "branches_as": ["foo"] }
{ "branches_ao": [{ "foo":42 }] }

Here’s an alternative way we could make the tagging explicit, by wrapping the diverging field’s values in an object:

1
2
{ "branches": { "_as" : ["foo"] } }
{ "branches": { "_ao" : [{ "foo":42 }] } }

The advantage to this tagged-union approach is that it makes it possible to write queries that handle heterogeneous collections. The disadvantage is that the path from the schema root to the tagged value will be different from the path to that value in the schemas of any of the objects in the original heterogeneous collection. The schema change may be surprising to the user, but — by definition — any approach to derive a fixed schema for a collection of objects with diverging schemas will not result in a schema that is totally compatible with the objects in the original collection.

Identifying maps

The other problem I observed related to encoding maps as JSON objects, as in the following example where we have a map from bicycle names to vehicle objects:

1
2
3
4
5
6
{
  "cx" : { "gears" : 20, "pedals" : "spd", "handlebars" : "drop" },
  "ss 29er" : { "gears" : 1, "pedals" : "spd", "handlebars" : "flat" },
  "del norte" : { "gears" : 1, "pedals" : "spd", "handlebars" : "drop" },
  "allez" : { "gears" : 20, "pedals" : "spd-sl", "handlebars" : "drop" }
}

The inferred schema for a collection including an object like this will have a distinct field for every key in every map. In this case, we don’t have an immediately great solution from the broader world of type systems. However, it seems that some heuristic — for example, “if an object has more than n fields and each has a value of the same type, it is a map” — might be able to do a good job of identifying objects that model maps.

In any case, it is possible for the user to preprocess their objects in order to encode maps in a different way if that will make it easier to query the data after ingest.

Final thoughts

Spark SQL’s support for schema inference is an extremely cool feature, although it obviously isn’t able to work magic by ascribing rigid structure to heterogeneous collections of ad hoc objects. Some minor improvements could make inferred schemas more useful for these kinds of collections, though.


  1. Scala doesn’t support untagged union types directly — although there are clever ways to encode them in the type system — and Scala won’t actually be able to infer sum types from collections of untagged values: the inferred type of Array("foo", 1) is Array[Any], not Array[String + Int].

  2. (Slightly) more precisely: objects are defined recursively as unordered collections of key-value pairs, where keys are strings and values can be numbers, booleans, strings, objects, or arrays of values.

In this post, I’ll briefly introduce fedmsg, the federated message bus developed as part of the Fedora project’s infrastructure, and discuss how to ingest fedmsg data for processing with Spark SQL. While I hope you’ll find the analytic possibilities of fedmsg data as interesting as I do, this post will also cover some possible pitfalls you might run into while ingesting complex JSON data or the contents of large SQL databases into Spark SQL more generally.

Background

fedmsg is one of the coolest things about Fedora’s infrastructure.1 Because nearly everything that happens in Fedora is logged to a messaging bus, there is a vast amount of public data about the daily operation of a large and thriving open-source community, all in one place.2 The whole list of message topics published on the Fedora project’s fedmsg bus are documented here, but these examples should give you a taste of the kinds of messages available for inspection:

  • when a Fedora contributor updates the package build metadata (including RPM specfile, patches, source tarball list. etc.) in git,3
  • when a package build is started (and then again when the package build succeeds or fails, and when the package build is pushed out to the distribution),
  • when a Fedora community member tags a package with keywords,
  • when a blog post is syndicated to Fedora Planet,
  • when a wiki page is created or updated,
  • when an IRC meeting starts or ends,

and so on. Furthermore, these data are accessible either as a stream of messages (as they are generated), as JSON-encoded historical data via a REST interface, or as a PostgreSQL dump of all historical fedmsg activity.

An exhaustive treatment of fedmsg is outside the scope of this post,4 but I wanted to make it clear that

  • open infrastructure for a very large open-source project is an extremely cool thing,
  • there is a lot of interesting data available in fedmsg,
  • there have been some cool applications built atop fedmsg already (see Fedora Badges for an Xbox-like “achievements” system or Ralph Bean’s animated visualization for a couple of fun examples), and
  • there are a lot of possibilities for making sense of this data using Spark, Spark SQL, and Spark Streaming.

Getting the historical data

I wanted to look at a large amount of data to start with, so I began by grabbing a dump of the datanommer database. (Datanommer is a service that records each message on the fedmsg bus in a SQL database.) The nightly dump I got was about 3 gb compressed and expanded to about 40 gb — certainly not “big data,” but too big to fit in main memory on my workstation. I loaded the dump into a new Postgres database and started inspecting it.

The largest table by far is messages,5 which has the following schema:

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE messages (
    id integer NOT NULL,
    i integer NOT NULL,
    "timestamp" timestamp without time zone NOT NULL,
    certificate text,
    signature text,
    topic text,
    _msg text NOT NULL,
    category text,
    source_name text,
    source_version text,
    msg_id text
);

There are a couple of things to note here:

  • the certificate and signature fields account for the bulk of the raw data, and
  • the _msg field actually contains the string representation of a topic-specific JSON object.

As we’ll see, each of these can lead to some pitfalls when attempting to ingest the data into Spark.

Pitfall 1: Single-node memory limits and Slick projections

Here’s the core of a very simple program that ingests fedmsg records from a local PostgreSQL database, using Slick, and outputs these to JSON files,6 one for each record,7 in a local directory (I’m not reproducing the generated code that models datanommer tables and rows here):

Importer.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.freevariable.copricapo

import scala.slick.direct._
import scala.slick.direct.AnnotationMapper._
import scala.slick.driver.PostgresDriver
import scala.slick.driver.PostgresDriver.simple._
import scala.slick.jdbc.StaticQuery.interpolation

import scala.reflect.runtime.universe

import com.freevariable.copricapo.schema._

object Importer {
  import org.json4s._
  import org.json4s.ext._
  import org.json4s.JsonDSL._
  import org.joda.time.DateTime
  import org.json4s.jackson.JsonMethods.{parse => parseJson, render => renderJson, compact}

  object JSONImplicits {
    private def tupleIfPresent[T](name: String, v: Option[T])
      (implicit cnv: T => JValue): JObject =
      v.map(some => (name -> some) ~ JObject()) getOrElse JObject()

    implicit def message2json(m: Tables.MessagesRow) = {
      ("id" -> m.id) ~
      ("i" -> m.i) ~
      ("timestamp" -> (new DateTime(m.timestamp.getTime()).toString)) ~
      tupleIfPresent("topic", m.topic) ~
      tupleIfPresent("category", m.category) ~
      ("msg" -> parseJson(m._Msg)) ~
      tupleIfPresent("msg_id", m.msgId) ~
      tupleIfPresent("source_name", m.sourceName) ~
      tupleIfPresent("source_version", m.sourceVersion)
    }
  }

  def apply(dburl: String, output_dir: String, limit: Int = 0) {
    import JSONImplicits._

    new java.io.File(output_dir).mkdir()

    Database.forURL(dburl, driver="org.postgresql.Driver") withSession { implicit session =>
      val messages = TableQuery[Tables.Messages]

      (if (limit == 0) messages else messages.take(limit)) foreach { m =>
        val id = m.id
        val outputFile = s"$output_dir/$id.json"
        println(s"rendering to $outputFile")
        val pw = new java.io.PrintWriter(new java.io.File(outputFile))
        pw.println(compact(renderJson(m)))
        pw.close
      }
    }
  }
}

Note that the message2json implicit conversion doesn’t do anything with the certificate and signature fields.
These fields are necessary for critical production applications because anyone can write to the fedmsg bus. However, I’m not planning to make any irreversible decisions based on my analyses of fedmsg data and am thus not currently interested in actually verifying message contents or provenance. Since I’m also not currently interested in wasting memory or performance, I’m going to strip these out of the generated JSON.

Unfortunately, even shipping these fields from the database to the JVM only to ignore them dramatically impacts the performance and memory footprint of our importer, so we’ll want to project these away sooner. The best solution I found for efficiently projecting away nullable fields from a SQL database accessed via Slick involved modifying the generated code to control how case classes were instantiated from the database. The generated code includes definitions of case classes for rows in each table, like this:

1
2
3
4
  case class MessagesRow(id: Int, i: Int, timestamp: java.sql.Timestamp,
    certificate: Option[String], signature: Option[String],
    topic: Option[String], _Msg: String, category: Option[String],
    sourceName: Option[String], sourceVersion: Option[String], msgId: Option[String])

The generated code also contains classes describing each table; these include a * method defining the default projection from a tuple to a row-specific case class. In our case, the default projection for the messages table looks like this:

1
2
3
4
5
6
  class Messages(tag: Tag) extends Table[MessagesRow](tag, "messages") {
    def * = (id, i, timestamp, certificate, signature, topic, _Msg, category, sourceName, sourceVersion, msgId) <>
      (MessagesRow.tupled, MessagesRow.unapply)

    // (remainder of class elided)
  }

If we modify it, we can ignore certificate and signature (essentially returning SQL NULL values instead of enormous encoded objects for these fields):

1
2
def * = (id, i, timestamp, None, None, topic, _Msg, category, sourceName, sourceVersion, msgId) <>
  (MessagesRow.tupled, MessagesRow.unapply)

Now we can fire up a REPL and produce JSON documents from the datanommer data; the following example will take the first 100 rows PostgreSQL gives us and generate documents for them in /tmp/scratch:

1
com.freevariable.copricapo.Importer("jdbc:postgresql:datanommer", "/tmp/scratch", 100)

Pitfall 2: The Lost Caverns (of type inference)

Spark SQL supports Hive-style records and arrays within relational data. An especially cool feature of Spark SQL, though, is that it can infer a HiveQL schema from a collection of JSON documents with complex nested structure (see the jsonFile method of SQLContext, which will operate on a single JSON document or a directory of JSON documents).

If you have a clone of the Spark repository on your machine, the easiest way to try this out is to launch sbt hive/console from within your local clone. You’ll then be able to build a schema from some JSON documents:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// sparkContext is predefined in the Spark Hive console
val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)

// /tmp/scratch contains JSON documents
val docs = sqlContext.jsonFile("/tmp/scratch")

// Inspect the schema
docs.printSchema

// Register it as a SQL table
docs.registerAsTable("documents")

// Get the first hundred rows as a Scala array
sqlContext.sql("select * from documents limit 100").collect

// Store a copy as a Parquet file
docs.saveAsParquetFile("/tmp/scratch.parquet")

JSON is convenient for users because it lets us specify untyped objects with no fixed schema. Unfortunately, JSON lets us specify untyped objects with no fixed schema. This means that Spark might have trouble inferring a fixed schema for our data. Consider how type inference works in real programming languages: if a variable can take one of two possible values, its type must be the unification on their types. Sometimes, this means we get a trivial typing (in Scala terms, Any), but in other cases, it means we get a contradiction (in Scala terms, Nothing). When Spark is inferring types for our documents, it will simply raise an exception if it tries to unify two objects with diverging types, as you can see in the following example:

1
2
3
val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
val diverging = sparkContext.parallelize(List("""{"branches": ["foo"]}""", """{"branches": [{"foo":42}]}"""))
sqlContext.jsonRDD(diverging)  // throws a MatchError

The two JSON objects each contain a field called branches, but in the former it is an array of strings, while in the latter it is an object. In the case of fedmsg data, this problem came up because different message topics used branches to correspond to different (but related) concepts. If we could rename these different cases, we could avoid the diverging types. Since the JSON library we’re using has a facility to transform JSON values based on partial functions, we can do this easily:

1
2
3
4
5
6
7
8
9
10
11
12
  def renameBranches(msg: JValue) = {
    msg transformField {
      case JField("branches", v @ JArray(JString(_)::_)) => ("pkg_branches", v)
      case JField("branches", v @ JArray(JObject(_)::_)) => ("git_branches", v)
    }
  }

  def transformObject(record: JValue) = {
    record transformField {
      case JField("_msg", msg) => ("_msg", renameBranches(msg))
    }
  }

The renameBranches method above will simply translate fields named branches with array-of-strings values to fields named pkg_branches with the same values and translate fields named branches with array-of-objects values to fields named git_branches with the same values. Other fields are ignored by this code.

Pitfall 3: Too many columns

A related problem comes up when we have JSON objects who use field names as entity names, like in the following example (taken from a message describing a git commit):

1
2
3
4
5
6
7
8
9
10
{
  "stats" : {
    "files" : {
      ".gitignore" : { "deletions" : 0, "insertions" : 1, "lines" : 1 },
      "cmake-fedora.spec" : { "deletions" : 3, "insertions" : 3, "lines" : 6 },
      "sources" : { "deletions" : 1, "insertions" : 1, "lines" : 2 }
    },
    "total" : { "deletions" : 4, "files" : 3, "insertions" : 5, "lines" : 9 }
  }
}

Note that files is essentially a collection of objects describing different changed files in the commit (.gitignore, cmake-fedora.spec, and sources), but it is represented as an object with a field for each changed file. Because of this, the inferred schema will have a field in the stats.files object for every single filename that has ever been committed to Fedora’s git repository! (That’s a lot of NULL values in a typical row.)

There are other similar cases in the fedmsg data as well where entity names become field names; these can all be handled by transformation rules in fairly straightforward fashion.

What’s next

This post should have enough background to help you get started looking at fedmsg data with Spark SQL. In future posts, I’ll discuss some additional correctness and efficiency considerations; demonstrate how to unify warehoused, batched, and streaming data; and introduce some analytic applications involving fedmsg data.


  1. I’m most familiar with Fedora, but the Debian community has also begun to use fedmsg to publish events about their infrastructure.

  2. I’ve recently been reading Jay Kreps’ I ♥ Logs, in which Kreps argues (from principle and from experience) for the centrality of the log abstraction in distributed systems. I may write a longer review later, but the argument is sound (and consonant with some of the arguments initially made for basing Fedora’s infrastructure around a messaging bus) and the book is definitely worth checking out!

  3. See the distgit source for the fedmsg package for an example.

  4. For more information, read the docs or check out the slides from Ralph Bean’s client-oriented or overview presentations.

  5. There are also tables to store the names of users, the names of packages, and to model the relationships between users or packages and fedmsg messages involving those users or packages.

  6. I’m using JSON here because of the rich structure of the topic-specific messages. If you were ingesting flat relational data, it would almost certainly make more sense to use JdbcRDD and do the ingest within Spark.

  7. Please don’t use the above code in production; it’s intended to be simple, not efficient. Producing one file per record makes it easy to see where things are going wrong, but it’s very slow both for this program and for the Spark jobs that you’ll want to write to read these data. Your local filesystem probably won’t be too happy with you, either.

I went to Strata + Hadoop World last week. This event targets a pretty broad audience and is an interesting mix of trade show, data science conference, and software conference. However, I’ve been impressed by the quality and relevance of the technical program both of the years that I’ve gone. The key to finding good talks at this kind of event is to target talks focusing on applications, visualization, and fundamental techniques, rather than ostensibly technical talks.1 In the rest of this post, I’ll share some of my notes from the talks and tutorials I enjoyed the most.

D3.js

The first event I attended was Sebastian Gutierrez’s D3 tutorial. (Gutierrez’s slides and sample code are available online.) A great deal of this session was focused on introducing JavaScript and manipulating the DOM with D3, which was great for me; I’ve used D3 before but my knowledge of web frontend development practice is spotty and ad-hoc at best.

This was a great tutorial: the pace, scope, content, and presentation were all really excellent. It provided enough context to make sense of D3’s substantial capabilities and left me feeling like I could dive in to using D3 for some interesting projects. Finally, Gutierrez provided lots of links to other resources to learn more and experiment, both things I’d seen, like bl.ocks.org, and things that were new to me, like tributary.io and JSFiddle.

The standalone D3 tutorial on DashingD3JS.com seems like an excellent way to play along at home.

Engineering Pipelines for Learning at Scale

I attended the “Hardcore Data Science” track in the afternoon of the tutorial day; there were several interesting talks but I wanted to call out in particular Ben Recht’s talk, which mentioned BDAS in the title but was really about generic problems in developing large-scale systems that use machine learning.

Recht’s talk began with the well-known but oft-ignored maxim that machine learning is easy once you’ve turned your analysis problem into an optimization problem. I say “oft-ignored” here not because I believe practitioners are getting hung up on the easy part of the problem, but because it seems like raw modeling performance is often the focus of marketing for open-source and commercial projects alike.2 The interesting engineering challenges are typically in the processing pipeline from raw data to an optimizable model: acquiring data, normalizing and pruning values, and selecting features all must take place before modeling and classification. Learning problems often have this basic structure; Recht provided examples of these pipeline stages in object recognition and text processing domains.

The main motivating example was Recht’s attempt to replicate the Oregon State Digital Scout project, which analyzed video footage of American football, and answered questions including what plays were likely from given offensive formations. So Recht’s team set out to stich together video footage into a panorama, translate from video coordinates to inferred field coordinates, and track player positions and movements. This preprocessing code, which they had implemented with standard C++ and the OpenCV library, took ten hours to analyze video of a four-second NFL play. Expert programmers could surely have improved the runtime of this code substantially, but it seems like we should be able to support software engineering of machine-learning applications without having teams of experts working on each stage of the learning pipeline.

The talk concluded by presenting some ideas for work that would support software engineering for learning pipelines. These were more speculative but generally focused on using programming language technology to make it easier to develop, compose, and reason about learning pipelines.

Keynotes

Since the talks from the plenary sessions are all fairly brief (and freely available as videos), I’ll simply call out a couple that were especially enjoyable.

Domain-Specific Languages for Data Transformation

Joe Hellerstein and Sean Kandel of Trifacta gave a talk on domain-specific languages for data transformation in general and on their Wrangle system in particular. (Trifacta is also responsible for the extremely cool Vega project.) The talk led with three rules for successful data wrangling, which I’ll paraphrase here:

  • your processes, not your data, are most important;
  • reusable scripts to process data are more valuable than postprocessed and cleaned data; and
  • “agile” models in which preprocessing is refined iteratively in conjunction with downstream analysis of the processed data are preferable to having preprocessing take place as an isolated phase.3

The first act of the talk also referenced this quotation from Alfred North Whitehead, which set the tone for a survey of DSLs for data transformation and query construction:

By relieving the brain of all unnecessary work, a good notation sets it free to concentrate on more advanced problems…

In the 1990s, several data processing DSLs emerged that were based on second-order logic, proximity joins, and composable transformations.4 Hellerstein argued that, since these underlying formalisms weren’t any simpler to understand than the relational model, the DSLs weren’t particularly successful at simplifying data transformation for end-users.

Raman and Hellerstein’s Potter’s Wheel system presented a visual interface to SQL for data cleaning; it could automatically detect bad data and infer structure domains but wasn’t particularly easier to use than SQL: putting a visual, menu-driven interface over a language doesn’t generally lower the cognitive load of using that language.5

The Wrangler system (which forms the basis for the Wrangle DSL in Trifacta’s product) attacks the problem from a different angle: data cleaning operations are suggested based on explicit user guidance, semantic information about types and domains, and inferences of user intent based on prior actions. The entire problem of suggesting queries and transformations is thus expressible as an optimization problem over the space of DSL clauses. With the interface to the Wrangle DSL, the user highlights features in individual rows of the data to appear in the transformed data or in a visualization, and the system presents several suggested queries that it had inferred. The Wrangle system also infers regular expressions for highlighted text transformations, but since regular expressions — and especially machine-generated ones — are often painful to read, it translates these to a more readable (but less expressive) representation before presenting them to the user.

Kandel gave a very impressive live demo of Wrangle on a couple of data sets, including techniques for sensibly treating dirty or apparently-nonsensical rows (in this example, the confounding data included negative contribution amounts from election finance data, which represented refunds or revised accounting). This is excellent work at the intersection of databases, programming languages, machine learning, and human-computer interaction — which is a space that I suspect a lot of future contributions to data processing will occupy.

Clustering for Anomaly Detection

Sean Owen’s talk on using Spark and k-means clustering for anomaly detection was absolutely packed. Whether this was more due to the overwhelming popularity of talks with “Spark” in the title or because Sean is widely known as a sharp guy and great presenter, I’m not sure, but it was an excellent talk and was about more than just introducing clustering in Spark. It was really a general discussion of how to go from raw data to a clustering problem in a way that would give you the most useful results — Spark was essential to make the presentation of ETL and clustering code simple enough to fit on a slide, but the techniques were generally applicable.

The running example in this talk involved system log data from a supervised learning competition. Some of the log data was generated by normal activity and some of it was generated by activity associated with various security exploits. The original data were labeled (whether as “normal” or by the name of the associated exploit), since they were intended to evaluate supervised learning techniques. However, Owen pointed out that we could identify anomalies by clustering points and looking at ones that were far away from any cluster center. Since k-means clustering is unsupervised, one of Owen’s first steps was to remove the labels.

With code to generate an RDD of unlabeled records, Owen then walked through the steps he might use if he were using clustering to characterize these data:

  • The first question involves choosing a suitable k. Since Owen knew that there were 23 different kinds of records (those generated by normal traffic as well as by 22 attack types), it seems likely that there would be at least 23 clusters. A naïve approach to choosing a cluster count (viz., choosing a number that minimizes the distance from each point to its nearest centroid) falls short in a couple of ways. First, since cluster centers are originally picked randomly, the results of finding k clusters may not be deterministic. (This is easy to solve by ensuring that the algorithm runs for a large number of iterations and has a small distance threshold for convergence.) More importantly, though, finding n clusters, where n is the size of the population, would be optimal under this metric, but it would tell us nothing.
  • The vectors in the raw data describe 42 features but the distance between them is dominated by two values: bytes read and bytes written, which are relatively large integer values (as opposed to many of the features, which are booleans encoded as 0 or 1). Normalizing each feature by its z-score made each feature contribute equally to distance.
  • Another useful trick is encoding features whose space is a finite domain of n elements as n boolean features (where only one of them will be true). So if your feature could be either apple, banana, cantaloupe, or durian in the raw data, you could encode it as four boolean-valued features, one of which is true if and only if the feature is apple, one of which is true if and only if the feature is banana, and so on.
  • Using entropy (or normalized mutual information) to evaluate the clustering can be a better bet since fitness by entropy won’t increase until k = n as fitness by Euclidean distance will. Another option is restoring the labels from the original data and verifying that clusters typically have homogeneous labels.

Summary

Many of the talks I found interesting featured at least some of the following common themes: using programming-language technology, improving programmer productivity, or focusing on the pipeline from raw data to clean feature vectors. I especially appreciated Sean Owen’s talk for showing how to refine clustering over real data and demonstrating substantial improvements by a series of simple changes. As Recht reminds us, the optimization problems are easy to solve; it’s great to see more explicit focus on making the process that turns analysis problems into optimization problems as easy as possible.


  1. Alas, talks with technical-sounding abstracts often wind up having a nontrivial marketing component!

  2. This is related to the reason why “data lakes” have turned out to be not quite as useful as we might have hoped: a place to store vast amounts of sparse, noisy, and irregular data and an efficient way to train models from these data are necessary but not sufficient conditions for actually answering questions!

  3. Hellerstein pointed out that the alternative to his recommendation is really the waterfall model (seriously, click that link), which makes it all the more incredible that anyone follows it for data processing. Who would willingly admit to using a practice initially defined as a strawman?

  4. SchemaSQL and AJAX were provided as examples.

  5. I was first confronted with the false promise of visual languages in the 1980s. I had saved my meager kid earnings for weeks and gone to HT Electronics in Sunnyvale to purchase the AiRT system for the Amiga. AiRT was a visual programming language that turned out to be neither more expressive nor easier to use than any of the text-based languages I was already familiar with. With the exception of this transcript of a scathing review in Amazing Computing magazine (“…most of [its] potential is not realized in the current implementation of AiRT, and I cannot recommend it for any serious work.”), it is now apparently lost to history.

leitmotif is a very simple templating tool that generates directories from prototypes stored in git repositories. Its design prioritizes simplicity and a minimal set of external dependencies.

While the leitmotif tool is still under development and some interesting features are planned for future work, it is already useful for many project-templating tasks. This post will show you how to get started.

Installing Leitmotif

You can install leitmotif either by using the RubyGem or by using the standalone version, which only depends on Ruby 2 and the Thor gem.

To install the RubyGem, simply run gem install leitmotif.

To install the standalone script, run the following commands:

1
2
3
gem install thor --version=0.17 && \
    curl -O https://github.com/willb/leitmotif/raw/master/extra/standalone/leitmotif && \
    chmod 755 leitmotif

In the future, I expect that leitmotif packages will be available for CentOS and Fedora.

Using Leitmotif

The leitmotif tool is self-documenting. Run leitmotif help to see options.

Copying a prototype from a remote repository

Use leitmotif clone URL to make a local copy of the remote prototype repository at URL in your local Leitmotif prototype store (under your home directory).

Listing locally-installed prototypes

Use leitmotif list to see the prototypes you have installed in your local store.

Instantiating prototypes

Use leitmotif generate PROTOTYPE OUTPUT_DIR to instantiate PROTOTYPE in OUTPUT_DIR. In this form, PROTOTYPE must be the path to a git repository containing a Leitmotif prototype. This command supports the following options:

  • --local: treat PROTOTYPE as the name of a prototype repository in the local store rather than as a path
  • --clobber: delete OUTPUT_DIR before processing the prototype, if it exists
  • --ref: a git tag, branch, or SHA to use from the prototype repository (defaults to master if unspecified)
  • --bindings KEY:VALUE ...: a list of variable bindings to use while instantiating the prototype
  • --verbose: provide additional output as leitmotif works

Creating Leitmotif prototypes

A Leitmotif prototype is just a git repository with a particular structure. Specifically, a prototype must have two entries in the repository root directory:

  1. a YAML file named .leitmotif that contains metadata about the prototype, and
  2. a directory named proto, which contains the prototype itself.

Prototype metadata

A .leitmotif file is just a YAML representation of a hash of metadata options. The following keys can appear in a .leitmotif file:

  • :name: the name of the prototype (used for documentation)
  • :version: the version of the prototype (used for documentation)
  • :required: a list of variables that must have user-provided values when the prototype is instantiated
  • :ignore: a list of files to copy to the instantiated prototype without processing by the templating engine
  • :defaults: a hash consisting of default values for prototype variables

Here’s an example .leitmotif file, from a prototype for Spark application development:

1
2
3
4
5
6
7
8
9
10
11
---
:name: sparkdev
:version: '0'
:required:
  - name
:ignore:
  - sbt
:defaults:
  :version: '0.0.1'
  :scala_version: '2.10.4'
  :spark_version: '1.1.0'

Prototypes and templating

With the exception of the files in the :ignore list, all of the files in a prototype repository are processed by ERB after they’re copied to the output directory. For more details on eRuby, see elsewhere, but here are a few basics to get you started:

  • If a template file contains Ruby code surrounded by <% and %>, that code is evaluated at prototype instantiation time with user-supplied variable bindings.
  • If a template file contains Ruby code surrounded by <%= and %>, that code is evaluated at prototype instantiation time with user-supplied variable bindings and the result of evaluating that code is substituted into the document at that point.

Combining these two, we can see how to use loops in a file:

1
2
3
4
5
6
7
8
Specifications of properties are often terser than explicit expected results.  For example:

<% 99.downto(1).each do |bottles| %>
   * <%= bottles %> bottles of beer on the wall, 
     <%= bottles %> bottles of beer; 
     take one down, pass it around, 
     <%= bottles > 1 ? "#{bottles} bottles" : "just one bottle" %> of beer on the wall
<% end %>

The above will generate a Markdown file containing a bulleted list that will strike terror into the heart of any adult who has ever been on a bus full of middle-schoolers.

Coming soon

I wrote this tool to solve an immediate need1 and will be updating it as new requirements become apparent. However, there are a few things that are already on my roadmap:

  • automated test coverage (currently — and shamefully! — there is none)2
  • additional commands, e.g., to inspect installed prototypes
  • post-instantiation actions, e.g., to rename a file or create a directory based on the result of a variable expansion

Of course, I welcome your feedback, issue reports, and patches as well.


  1. Specifically, my need to be lazy when creating new projects.

  2. I’m probably getting too old to program in untyped languages.

I’ve written in the past about what a mistake it is to add behavioral incentives or morality clauses to the licenses for open-source projects. Briefly, these clauses are bad:

  • philosophically because they infringe the on basic software freedom to use a freely-available project for any purpose1;
  • legally because they generally rely on vague and subjective terms and (paradoxically) thus rendering your license both radioactive and unenforceable; and
  • practically because instead of preventing people from doing things that you don’t like, they just prevent people — even those who wouldn’t be inclined to do things you don’t like! — from using your work.

One software domain in which licenses with usage restrictions are commonplace (and, unfortunately, widely accepted) is digital fonts. This has historically been a problem for Fedora, since many “free” fonts are not open-source (in that they do not permit modification) or have usage restrictions (in particular, against “commercial” use). Furthermore, much like novel one-off open-source software licenses, most partially-free licenses authored by font creators are unlikely to be unambiguous or legally sensical.2

The problem gets far worse when we look at commercial fonts, for which the type of application and type of output are often incorporated in usage restrictions. Last year, I licensed a font whose EULA was self-contradictory; you can click the link for the whole story, but the main problem was that it claimed to broadly allow rasterization, including use to “create images on computer screens, paper, web pages, photographs, movie credits, printed material, T-shirts, and other applications,” but that I couldn’t use the font “as part of broadcasting video or film […] the use of the font software in titling, credits or other text for any onscreen broadcast via television, video, or motion picture.”

Since one of my creative endeavors is editing home and bicycling videos, the combination of being allowed to “create … movie credits” but not to “use the font software in titling, credits or other text for … motion picture” was pretty frustrating, especially since the EULA hadn’t been obviously available for review until after I licensed the face. (There were numerous other problems, ambiguities, and contradictions with the license, and when I asked the foundry for for a clarification, they curtly denied that there was anything to be confused about.)

For more than a year and a half, this inconsistent license has been my benchmark to evaluate whether or not a font is more trouble than it’s worth, and I’ve at least learned to be more careful reading font EULAs.3 However, I recently came across an example4 that so completely outclasses the contradictory license that I can’t imagine it will be replaced as the new standard for terrible usage clauses in licenses. Here’s an excerpt; the author’s name is redacted:

Fonts by [redacted] may NOT be used for pornographic, derogatory, defamatory or racist material (in any form, printed or virtual); fonts by [redacted] may NOT be used by individuals or companies involved in child abuse, child pornography or child labour; fonts by [redacted] may NOT be used by individuals or companies involved in destruction of natural resources and/or habitat, logging (even legal), palm oil exploitation/harvesting, tuna fishing, whaling, animal trafficking, oil and/or gas drilling or transporting and mining. Fonts by [redacted] may NOT be used by individuals or companies promoting an unhealthy lifestyle (fast food, energy drinks, foods containing GM ingredients). Fonts by [redacted] may NOT be used by companies or individuals involved in Genetic Modification / Genetic Alteration of organisms. Fonts by [redacted] may NOT be used by individuals or companies involved in fur trade, or making use of fur. Fonts by [redacted] may NOT be used by missionaries, individuals or institutions of any creed or faith for the purpose of converting others to their creed or faith. Fonts by [redacted] may NOT be used to instigate terror, hate, intolerance, fear or racism.

I almost don’t know where to begin with this one, since it raises so many questions for the licensor:

  • When do we cross the line from light-hearted ribbing to “derogatory material”? Are satire and parody proscribed?
  • Say I use this font to title a bicycling video; would the licensor need to see what sort of chain lubricant I used or whether or not I was a regular inner-tube recycler before determining that I wasn’t involved in “destruction of natural resources”? (What about people who eat animal protein? Or plants?)
  • Are we mostly worried about the Indonesian orangutan, or is (relatively-low-impact) West African palm oil OK? Is the palm oil industry full of typophiles? How many jars of palm oil bore the licensor’s glyphs before he added this clause?
  • How are we defining “unhealthy” or “genetic modification”? It seems like consensus regularly changes on the former and the latter is — like much of the language in this clause — actually kind of vague despite being deployed in an absolute manner. Can I replant seeds from my best tomatoes or must I toss them? (In a related question, since I have the suspicion that it might come up in a future revision of this license: is it OK to use this font to promote vaccination?)
  • What about people of faith whose creed includes a concept of vocation, or the notion that they serve God through their creative and professional work?

Hopefully, these kinds of questions — along with many more like them that you may be asking yourself — underscore why this kind of license is a problem: the licensor probably hoped to make a clear statement of principles, condemn what he saw as social ills, and avoid assisting people and groups he’d find objectionable. Instead, the license restrictions are so broad as to exclude use by anyone except the font’s creator (who is unlikely to sue himself for breach of contract). No matter who you are or what you do, if the licensor wanted to sue you, he probably could.

I’m inclined to excuse that last sentence, though, since the licensor seems to know a thing or two about instigating “hate, intolerance, [and] fear.”


  1. The GNU Project calls this “Freedom 0.”

  2. If you’re releasing open-source software, you should use a well-known license. If you want to release a font freely, you should have a very good reason for using anything except the Open Font License or the LPPL.

  3. This may be surprising to people used to regular software licensing, but I’ve seen fonts for sale in different places with different licenses! It seems like some sellers have standard EULAs and require font creators to allow distribution under these, while others maintain the creators’ EULAs.

  4. Alas, I came across this example after paying for a license.

As everyone knows, internet comments are pure, unadulterated evil.1 But it’s nonetheless nice when blogs give readers an easy way to provide feedback. I recently reworked my Octopress installation to allow readers to reply to my posts via Twitter or email; this post will show you how to set it up for yourself. We’ll be using Twitter’s Web intents API to make it happen.

Step 1: Configuration

If you haven’t already set your Twitter username in your Octopress _config.yml, do so now; here’s what mine looks like:

excerpted from _config.yml
1
2
3
# Twitter
twitter_user: willb
twitter_tweet_button: false

Step 2: Create a new partial layout

The next thing we’ll do is add a new partial layout that generates reply-via-Twitter links. Create source/_includes/post/feedback.html in your Octopress directory and give it the following contents:

source/_includes/post/feedback.html
1
2
3
4
5
6
7
 • You may
 {% if site.twitter_user and page.skip_twitter_feedback != true %}
    <script type="text/javascript" src="//platform.twitter.com/widgets.js"></script>
    {% capture tweet_text %}@{{site.twitter_user}} re: {{site.url}}{{page.url}}{% endcapture %}
  <a href="https://twitter.com/intent/tweet?text={{ tweet_text | uri_escape }}">reply to this post on Twitter</a> or
 {% endif %}
<a href="mailto:{{site.email}}">email the author</a>.

Note that this will always provide an email feedback link, targeting the site-wide email variable from your _config.yml.2 It will provide reply-via-twitter links if you’ve set the twitter_user configuration variable and haven’t set skip_twitter_feedback: true in your post’s YAML front matter. (I use this to eliminate redundancy for cases like this, in which I’ve asked for replies via Twitter in the body of the post.) When a reader clicks on the reply-via-twitter link, it will take them to Twitter3 with a prepopulated tweet:

Web Intent

Step 3: Add the new partial to your post template

Finally, edit your post template to ensure that it includes feedback.html in an appropriate place. This will obviously depend on your theme and your taste, but I’ve included my post template — which is based on octostrap3 and includes the feedback links directly following the category list — as an example; see line 19:

source/_layouts/post.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
---
layout: default
navbar: Blog
single: true
---

<div class="row">
  <div class="page-content {% unless page.sidebar == false or site.post_asides == empty and site.default_asides == empty %}col-md-9{% else %}col-md-12{% endunless %}" itemscope itemtype="http://schema.org/Blog">
    <meta itemprop="name" content="{{site.title}}" />
    <meta itemprop="description" content="{{site.description}}" />
    <meta itemprop="url" content="{{site.url}}" />
    <article class="hentry" role="article" itemprop="blogPost" itemscope itemtype="http://schema.org/BlogPosting">
      {% include article.html %}
      <footer>
        <p class="meta text-muted">
          {% include post/author.html %}
          {% include post/date.html %}{% if updated %}{{ updated }}{% else %}{{ time }}{% endif %}
          {% include post/categories.html %}
          {% include post/feedback.html %}
        </p>
        {% unless page.sharing == false %}
          {% include post/sharing.html %}
        {% endunless %}
        {% if page.previous.url or page.next.url %}
          <ul class="meta text-muted pager">
            {% if page.previous.url %}
            <li class="previous"><a href="{{page.previous.url}}" title="Previous Post: {{page.previous.title}}">&laquo; {{page.previous.title}}</a></li>
            {% endif %}
            {% if page.next.url %}
            <li class="next"><a href="{{page.next.url}}" title="Next Post: {{page.next.title}}">{{page.next.title}} &raquo;</a></li>
            {% endif %}
          </ul>
        {% endif %}
      </footer>
    </article>

    {% if site.disqus_short_name and page.comments == true %}
      <section>
        <h1>Comments</h1>
        <div id="disqus_thread" aria-live="polite">{% include post/disqus_thread.html %}</div>
      </section>
    {% endif %}
  </div>

  {% unless page.sidebar == false or site.post_asides == empty and site.default_asides == empty %}
  <aside class="sidebar col-md-3">
    {% if site.post_asides.size %}
      {% include_array post_asides %}
    {% else %}
      {% include_array default_asides %}
    {% endif %}
  </aside>
  {% endunless %}
</div>

Step 4: There is no Step 4

I hope this was helpful! If you have feedback on this post, I bet you can figure out where to find me.


  1. Only mostly kidding.

  2. If you haven’t sent this variable, you can replace {{site.email}} with your email address. You can also replace the whole mailto link with the results of some obfuscation technique like the Hivelogic Enkoder if you still care about concealing your email address from spambots.

  3. By default, it will open this in a pop-up window unless your users are blocking Javascript from Twitter. If you’re interested in avoiding pulling down scripts from external sites, eliminating the script link to platform.twitter.com/widgets.js will only disable the pop-up window, not affect reply-via-tweet functionality in general. (You can, of course, use well-known techniques to open the link in a pop-up window without the external script.)

One of my side projects this year has been using Apache Spark to make sense of my bike power meter data. There are a few well-understood approaches to bike power data modeling and analysis, but the domain has been underserved by traditional machine learning approaches, and I wanted to see if I could quickly develop some novel techniques. My experiments so far are available as open-source code and they have been quite successful in two ways: not only have I mined some useful information from my rides, but I’ve also demonstrated that Spark is responsive enough to be a good substitute for R in exploratory use cases.

However, the code I’ve produced up to this point has really been more focused on experimenting with analytic techniques rather than on scalable implementations. I’ve valued obvious correctness and simplicity over performance, in some cases making implementation decisions that were obviously suboptimal in the interest of having simple implementations. (The blessing and the curse of declarative programming is that you’re insulated, to some extent, from how your application actually executes!)

Since publishing the code, giving talks about it, and demoing it, I’ve decided that the basic approach is fairly sensible. I’ve also seen a lot of interest from teammates and other cyclists who’d like to analyze their own data, and I’d like to eventually make the code available as a public-facing service — which means that it’s time to adjust my tradeoffs to favor efficiency a bit more.

In this post, we’ll look at several minor changes (and one major change) that improved my code’s performance, identifying the common issues that each uncovered. This certainly won’t be an exhaustive catalog of Spark optimization tips, and I’m not claiming that the code I wound up with is perfect, but keeping these issues in mind should improve your Spark apps. We’ll cover using broadcast variables to distribute data to workers, using caching sensibly, being sensitive to data shuffles, and designing applications to take advantage of laziness.

Background

The particular code we’re looking at is from one of the the applications I discussed at Spark Summit this year.1 The question this app aims to answer is “given historical ride data, where am I likely to get my best mean n-second power?” Its basic approach is:

  1. load activity data from disk, transforming it into a collection of Trackpoint records,
  2. optimize a k-means clustering (with a relatively large k) for the spatial locations of each Trackpoint,
  3. find the mean power for each overlapping n-second window of each activity,
  4. for each pair (i, j) of spatial clusters, reject all but the best n-second efforts starting in cluster i and ending in cluster j, and
  5. finally, plot the trackpoints from the best remaining 20 efforts

The Trackpoint structure is designed to capture all the metrics2 in each sample recorded in a Garmin TCX file; it is a Scala case class with timestamp, coordinates, altitude, wattage, and activity identity. Once we’ve processed our input files into Trackpoint records, we’ll group these records by activity, giving us a collection of pairs that encode a mapping from activity names to sequences of trackpoints, like this:

Schematic of per-activity trackpoint collections

Since dealing with windowed data is common to several of my bike data applications, I had factored it out into a common trait:

excerpted from common.scalalink
1
2
3
4
5
6
7
8
9
10
11
trait ActivitySliding {
  import org.apache.spark.rdd.RDD
  import com.freevariable.surlaplaque.data.Trackpoint

  def windowsForActivities[U](data: RDD[Trackpoint], period: Int, xform: (Trackpoint => U) = identity _) = {
    val pairs = data.groupBy((tp:Trackpoint) => tp.activity.getOrElse("UNKNOWN"))
    pairs.flatMap({case (activity:String, stp:Seq[Trackpoint]) => (stp sliding period).zipWithIndex.map {case (s,i) => ((activity, i), s.map(xform))}})
  }

  def identity(tp: Trackpoint) = tp
}

The windowsForActivities function maps the data into overlapping period-sample windows. Since it’s not possible, in general, to efficiently calculate sliding windows over RDDs3, we operate on materialized sequences of trackpoints, and return pairs representing a mapping from pairs of activity identifiers and sample offsets to sequences representing sample windows, like this:

Diagram of sample windows

The windowsForActivities function also takes an optional transformation function argument (xform), which can be used to map Trackpoint records after splitting them into sliding windows. In my case, I use these to discard metrics data that isn’t relevant to a given application. Since our first approach to finding best efforts involves keeping every effort window around until we know which ones we’ll ultimately care about,4 going to a leaner trackpoint structure can save a lot of memory and improve performance. In the case of this application and my data, translating trackpoint records to simpler records (containing only latitude, longitude, and wattage) resulted in approximately a 2x speedup on my data.

In the rest of this post, we’ll look at the initial approach and some iterative refinements. For each refinement, we’ll look at the general Spark application performance issues it uncovered as well as the particular speedups I saw. (We’ll also look at a change that I thought would improve performance but didn’t.) First, though, a word of caution: I tried to do meaningful and repeatable timings, but was working on my personal workstation (hardly a controlled environment) didn’t do anything resembling a rigorous performance evaluation. Furthermore, your mileage may vary; be sure to thoroughly test improvements on your own applications and data!

The first pass

We’ll be looking at a small function in which PowerBestsApp spends most of its time; this is the function that finds the best n-sample efforts after filtering out all but the best effort starting and ending in a given pair of clusters. Initially, it looks like this:

excerpted from power_bests.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: KMeansModel) = {
  val windowedSamples = windowsForActivities(data, period, stripTrackpoints _).cache
  val clusterPairs = windowedSamples
    .map {case ((activity, offset), samples) => ((activity, offset), (closestCenter(samples.head.latlong, model), closestCenter(samples.last.latlong, model)))}
  val mmps = windowedSamples.map {case ((activity, offset), samples) => ((activity, offset), samples.map(_.watts).reduce(_ + _) / samples.size)}

  val top20 = mmps.join(clusterPairs)
   .map {case ((activity, offset), (watts, (headCluster, tailCluster))) => ((headCluster, tailCluster), (watts, (activity, offset)))}
   .reduceByKey ((a, b) => if (a._1 > b._1) a else b)
   .map {case ((headCluster, tailCluster), (watts, (activity, offset))) => (watts, (activity, offset))}
   .sortByKey(false)
   .take(20)

  app.context.parallelize(top20, app.context.defaultParallelism * 4)
   .map {case (watts, (activity, offset)) => ((activity, offset), watts)}
   .join (windowedSamples)
   .map {case ((activity, offset), (watts, samples)) => (watts, samples)}
   .collect
}

When we invoke bestsForPeriod, we already have an RDD of trackpoint records corresponding to every sample in every activity under consideration. We have also optimized a set of k cluster centers (I typically use k=128 for my data) and have the resulting model available. In the body of this function, we:

  • calculate and cache the overlapping sample windows for each activity, converting each Trackpoint record (with the stripTrackpoints function, declared elsewhere) into a minimal record consisting only of latitude, longitude, and wattage (line 2),
  • we declare clusterPairs, a RDD of pairs in which window identifiers (that is, activity-offset pairs) are the keys and pairs of cluster centers (one for the starting cluster and one for the ending cluster) are the values (lines 3—4),
  • we declare mmps, an RDD of pairs in which the keys are window identifiers and the values are mean wattages for the corresponding window of samples (line 5),
  • we then find the top twenty efforts, only considering the best effort starting and ending in a given pair of clusters:
    • first, we join mmps with clusterPairs, giving us an RDD mapping from window identifiers to tuples of mean wattages and start-end cluster pairs (line 7),
    • we transform this mapping so that it’s keyed by the start-end clusters of each window (line 8),
    • and reject all but the best effort for each start-end cluster pair (line 9),
    • we then re-key the resulting pairs by mean wattage (line 10),
    • and sort by mean wattage in descending (hence false) order (line 11)
    • now, up until this point, we haven’t actually done anything besides construct a directed acyclic graph of RDD dependencies, since we’ve exclusively invoked RDD transformations, which are lazy.5 However, in line 12, we’re going to force computation of the mean wattage-window identifier pairs and send the twenty best ones back to the driver program
  • we convert the collection of pairs back to an RDD (line 14; note that we’re assuming low default parallelism or possible expansion to consider more than 20 best efforts), and immediately transpose each pair to re-key by window identifier (line 15),
  • join this collection with the windows themselves (line 16), resulting in a map from window identifiers to tuples of mean wattages and the samples themselves6 that contains only our best 20 efforts, and
  • strip the window identifier from each tuple, leaving only pairs of wattages and windows (line 17) before firing off a Spark job to actually do the computations we described in lines 14—17, returning the results to the driver program (line 18)

This code runs quickly enough for prototyping, but it could be a lot faster. Experienced Spark programmers likely see several opportunities for improvement here; hopefully the changes we’re about to make will cover most of them.

Broadcasting: a simple win

The first and easiest change we can make is to ensure that we aren’t passing too much data in closure environments, since that data has to be serialized and shipped to workers (in my case, on a single multiprocessor workstation, but potentially across a cluster). Line 4 in our original function relied on having a KMeansModel available in the closure’s environment. Now, a k-means model for 128 cluster centers isn’t huge by any means, but it’s also not a trivial thing to have to serialize along with a relatively small closure. By passing the model in a Spark broadcast variable instead, we can reduce the cost of creating and executing the RDD transformations that depend on it:

excerpted from power_bests.scala
1
2
3
4
5
6
7
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: Broadcast[KMeansModel]) = {
  val windowedSamples = windowsForActivities(data, period, stripTrackpoints _).cache
  val clusterPairs = windowedSamples
    .map {case ((activity, offset), samples) => ((activity, offset), (closestCenter(samples.head.latlong, model.value), closestCenter(samples.last.latlong, model.value)))}

    // ... rest of function omitted
}

Note that all I did here was change bestsForPeriod to expect a Spark broadcast variable instead of a raw KMeansModel, and then pass model.value to the closestCenter function where it had previously expected a KMeansModel. This very simple change resulted in an approximately 1.1x speedup on my code. If you have large read-only data (whether bigger models or anything else) that needs to go to Spark workers with closure arguments, you might see even bigger improvements.

To create a broadcast variable, simply call the broadcast method of SparkContext with the value you wish to broadcast as a parameter:

Broadcast variable example
1
2
3
4
5
6
scala> val evenNumbers = (0 to 100000 by 2).toArray
evenNumbers: Array[Int] = Array(0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198, 200, 202, 204, 206, 208, 210, 212, 214, 216, 218, 220, 222, 224, 226, 228, 230, 232, 234, 236, 238, 240, 242, 244, 246, 248, 250, 252, 254, 256, 258, 260, 262, 264, 266, 268, 270, 272, 274, 276, 278, 280, 282, 284, 286, 288, 290, 292, 294, 296, 298, 300, 302, 304, 306, 308, 310, 312, 314, 316, 318, 320, 322, 324, 326, ...
scala> val broadcastEvens = sc.broadcast(evenNumbers)
14/09/09 14:45:40 INFO storage.MemoryStore: ensureFreeSpace(200048) called with curMem=0, maxMem=9239474995
14/09/09 14:45:40 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 195.4 KB, free 8.6 GB)
broadcastEvens: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

Eliminating counterproductive caching

One of the big benefits to using Spark over other distributed computing frameworks is that intermediate results can be cached in cluster memory. However, caching can be both a gift and a curse: it’s always possible to cache something, even something you use many times, whose impact on your memory usage far outweighs the cost of recomputing it. In our case, the largest collection we deal with is the windowedSamples RDD, which, depending on the window size, can be hundreds of times larger than the input data. Actually computing the windows is not particularly expensive, they aren’t used many times, and we use only a tiny fraction of the windows at the end of the function in any case. By eliminating the .cache call on windowedSamples, we acknowledge that it is cheaper to recompute the few windows we’ll need to examine more than once than it is to dedicate the memory necessary to keeping the windows around. This change resulted in an approximately 1.75x speedup on my code.

I hesitate to call this a simple fix, since it requires some insight into your application. But as we’ve already seen with the Trackpoint transformation, memory pressure can make a huge difference to Spark applications. Just because you can cache something doesn’t mean you should. (This is a lesson that users of tabled Prolog systems learn quickly: if you start by tabling — that is, memoizing — every relation in your database, you’ll have a fast, expressive environment until everything explodes and your computer becomes a pile of glowing sand.)

Being sensitive to shuffles

Recall that when Spark actually executes a job, it pipelines as many individual tasks as possible into stages, which can be executed on workers without coordinating or shuffling data.7 Ideally, stages will consist of several tasks to allow for high utilization; intuitively, we want workers to spend far more time computing results than coordinating with the driver. However, when RDD transformations change the keys in a collection of pairs (for example), we force Spark to coordinate between workers, shuffling RDD elements around in order to repartition them.

An interesting non-improvement

Since our application deals with some fairly large collections (windowedSamples), shuffling RDDs can be especially painful. However, because of the way we want to use the data, it’s hard to eliminate all of the shuffles in our method. So instead, we’ll examine the impact of the shuffle by looking at another version of bestsForPeriod that starts by fusing the two transformations on windowSamples (from lines 3—5 in our original excerpt) into one transformation:

excerpted from power_bests.scala with transformation fusion
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: Broadcast[KMeansModel]) = {
  val windowedSamples = windowsForActivities(data, period, stripTrackpoints _)
  val bests = windowedSamples.map {
    case ((activity, offset), samples)  =>
      (
        (closestCenter(samples.head.latlong, model.value), closestCenter(samples.last.latlong, model.value)),
        (samples.map(_.watts).reduce(_ + _) / samples.size, (activity, offset))
      )
  }

  val top20 = bests.reduceByKey ((a, b) => if (a._1 > b._1) a else b)
   .reduceByKey ((a, b) => if (a._1 > b._1) a else b)
   .map {case ((headCluster, tailCluster), (watts, (activity, offset))) => (watts, (activity, offset))}
   .sortByKey(false)
   .take(20)

  app.context.parallelize(top20, app.context.defaultParallelism * 4)
   .map {case (watts, (activity, offset)) => ((activity, offset), watts)}
   .join (windowedSamples)
   .map {case ((activity, offset), (watts, samples)) => (watts, samples)}
   .collect
}

This transformation fusion essentially eliminates one of the map transformations and also a join, giving us a collection of tuples of start-end cluster pairs and pairs of mean wattages and window identifiers. Interestingly, eliminating these transformations made the application slower! In fact, it ran approximately 19% slower than the previous version. However, this code affords more opportunities to eliminate re-keying and shuffles, and in so doing, we can once again achieve comparable performance to the previous version.

Here’s where the code wound up after several transformations:

excerpted from improved power_bests.scala with transformation fusion
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: Broadcast[KMeansModel]) = {
  val windowedSamples = windowsForActivities(data, period, stripTrackpoints _)

  val bests = windowedSamples.map {
    case ((activity, offset), samples)  => (
      (closestCenter(samples.head.latlong, model.value), closestCenter(samples.last.latlong, model.value)),
      ((activity, offset), samples.map(_.watts).reduce(_ + _) / samples.size)
    )
  }.cache

  val top20 = bests.reduceByKey ((a, b) => if (a._2 > b._2) a else b)
   .map { case ((_, _), keep) => keep }
   .takeOrdered(20)(Ordering.by[((String, Int), Double), Double] { case ((_, _), watts) => -watts})

  app.context.parallelize(top20, app.context.defaultParallelism * 4)
   .join (windowedSamples)
   .map {case ((activity, offset), (watts, samples)) => (watts, samples)}
   .collect
}

Note that I’ve eliminated a lot of tuple transposition and other unnecessary re-keyings. In particular, one of the biggest wins was to use the takeOrdered method with an explicit Ordering to avoid re-keying tuples on mean wattages in order to use take(n). (There are still some tuple re-keyings that could be eliminated; in particular the map on line 19 could probably be handled more efficiently in the driver in many cases, but we’re talking about marginal gains at this point.)

After these changes, the fused-transformation implementation performed almost exactly as well in terms of wall-clock time as the previous implementation. Since the prior implementation was slightly higher-level, I decided I’d prefer it as a basis for future efforts, but I kept the takeOrdered instead of take(n), which resulted in a small (~5%) speedup.8

Lazily materializing sample windows

The biggest improvement I made was the most invasive, but it addressed the most obvious deficiency of the initial implementation: specifically, it’s terribly wasteful to store and shuffle sample windows that are relatively huge in the aggregate but cheap to compute and unlikely to be needed.

Instead of keeping a collection of sample windows keyed by window identifiers, I instead elected to summarize efforts by recording Effort structures containing mean wattage, activity name, and starting and ending timestamps. In order to do this, I generalized the sliding-window behavior from my ActivitySliding trait, allowing clients to specify their own transformations for individual windows:

excerpted from common.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
trait ActivitySliding {
  import org.apache.spark.rdd.RDD
  import com.freevariable.surlaplaque.data.Trackpoint

  def windowsForActivities[U](data: RDD[Trackpoint], period: Int, xform: (Trackpoint => U) = identity _) = {
    applyWindowed(data, period, {case (activity, samples, offset) => ((activity, offset), samples.map(xform))})
  }

  def applyWindowed[U: ClassTag](data: RDD[Trackpoint], period: Int, xform: ((String, Seq[Trackpoint], Int) => U)) = {
    val pairs = data.groupBy((tp:Trackpoint) => tp.activity.getOrElse("UNKNOWN"))
    pairs.flatMap {
      case (activity: String, stp:Seq[Trackpoint]) =>
        (stp sliding period).zipWithIndex.map { case (s, i) => xform(activity, s, i) }
    }
  }

  def applyWindowedNoZip[U: ClassTag](data: RDD[Trackpoint], period: Int, xform: ((String, Seq[Trackpoint]) => U)) = {
    val pairs = data.groupBy((tp:Trackpoint) => tp.activity.getOrElse("UNKNOWN"))
    pairs.flatMap {
      case (activity: String, stp:Seq[Trackpoint]) => (stp sliding period).map { xform(activity, _) }
    }
  }

  def identity(tp: Trackpoint) = tp
}

I was then able to use the applyWindowedNoZip function to generate a collection of tuples in which the keys were start-end cluster pairs and the values were Effort structures. I still needed to filter the whole dataset to find the few windows I cared about after identifying the best efforts for each pair of clusters, but that was pretty straightforward. However, I now needed to make sure I cached that RDD of every trackpoint before entering bestsForPeriod in order to avoid reloading them from disk multiple times! Here’s what the final code looked like:

excerpted from power_bests.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: Broadcast[KMeansModel]) = {
  val clusteredMMPs = applyWindowedNoZip(data, period, {
      case (activity:String, samples:Seq[Trackpoint]) =>
        (
          (closestCenter(samples.head.latlong, model.value), closestCenter(samples.last.latlong, model.value)),
          Effort(samples.map(_.watts).reduce(_ + _) / samples.size, activity, samples.head.timestamp, samples.last.timestamp)
        )
    })

  clusteredMMPs
   .reduceByKey ((a, b) => if (a.mmp > b.mmp) a else b)
   .takeOrdered(20)(Ordering.by[((Int, Int), Effort), Double] { case (_, e:Effort) => -e.mmp })
   .map {
     case (_, e: Effort) => (
       e.mmp,
       data.filter {
         case tp: Trackpoint => tp.activity.getOrElse("UNKNOWN") == e.activity && tp.timestamp <= e.endTimestamp && tp.timestamp >= e.startTimestamp
       }.collect
     )
   }
}

This code is not particularly more difficult to read than the code we started with, but it is substantially faster; it is over 3.7x faster than the prior version and more than 14x faster than my initial prototype (not shown in this post). There are certainly further improvements to make (even some fairly obvious ones), but an important lesson from this exercise is that Spark not only makes it easy to experiment with novel analytic approaches, it also makes it easy to experiment with improved implementations that still retain their declarative elegance.

Conclusions

This post has shown several simple, orthogonal changes to a Spark application that readily compose together for a huge performance improvement. The changes touched on several broader themes you’ll want to keep in mind as you develop your own Spark applications:

  • Avoid shipping large data in closure environments, but instead prefer broadcast variables;
  • Don’t cache indiscriminately, but be sure that the cost of added memory pressure is worth the savings of avoiding recomputation;
  • Understand Spark’s execution model in order to help Spark help you; and
  • Don’t be featured on the next episode of “Big Data Hoarders,” but rather structure your application to embrace laziness and keep only what you need!

Thanks for reading! If you have further tips, you can reply to this post via Twitter.


  1. See this post for a high-level overview of the technique used by this app, along with some discussion of how the data guided the choices I made.

  2. Well, almost all of the metrics. It doesn’t capture heart rate or cadence because I’m not doing anything with those yet, and it doesn’t include speed sensor data because that can be derived from the coordinates of pairs of trackpoints.

  3. Xiangrui Meng proposed an implementation of sliding windows for RDDs earlier this year; the discussion in that link provides some good background on why exposing sliding-window functionality might be a bad idea. (Briefly, the issue is that cross-partition windows are necessarily very expensive, so as the window size approaches — or surpasses — the number of elements in a partition, performance tanks.) A sliding window implementation is currently available as an internal API for MLlib algorithms, though.

  4. This is the most obvious performance pitfall of this approach, and it was obvious from the beginning. Having the windows around, though, made experimenting with my data much easier. Spoiler alert: we’ll look at what happens when we eliminate this liability last.

  5. Advanced Spark users will also note that this statement involves a small lie, since the zipWithIndex method used in windowsForActivities needs to launch a Spark job to appropriately number records across partitions.

  6. The samples themselves are important since we want to plot the paths on which the best efforts occurred.

  7. A great introductory explanation of how Spark executes jobs is Aaron Davidson’s talk from Spark Summit 2014.

  8. Note that these different approaches may have resulted in similar wall-clock times but could have higher or lower utilization — and thus different ultimate performance characteristics — depending on your environment. Be sure to test possible improvements with your code and data!

Consider the collection1 of all contiguous subsequences of a sequence. If we’re talking about a stream of n observations, this could be the multiset of windows containing every possible window over these observations: 1 window of n samples, 2 windows of n-1 samples, 3 windows of n-2 samples, …, and n “windows” of 1 sample each. If we’re talking about a string of symbols, e.g., abcde, then we’d be talking about the following set of substrings:

{
  abcde, 
  abcd, bcde, 
  abc, bcd, cde,
  ab, bc, cd, de,
  a, b, c, d, e
}

Given a sequence of n elements, there will be T(n) subsequences in such a collection, where T(n) is the nth triangular number. If we must store each subsequence without using some representation where two subsequences can share elements they have in common (e.g., if we were to store each as a distinct C-style array), the space requirements to do so explode quickly:

space complexity plot

Anyway, I’m curious if this concept has a name. I’ve been thinking of it as a “power substring,” analogously to a powerset. If you know of a name for it, please let me know!


  1. I find myself thinking of subsequence identity as depending upon its position in the parent sequence, and thus thinking of this collection as a set even if it could contain multiple subsequences with identical elements.

If you’re like me, you often find yourself pasting transcripts into sbt console sessions in order to interactively test out new app functionality. A lot of times, these transcripts have a great deal of boilerplate and can be dramatically simplified. Fortunately, sbt provides a facility to let you specify what commands should run at the beginning of a console session: the project-specific initialCommands setting.1 sbt also provides a cleanupCommands setting to specify commands to run when your REPL exits, so if you’re testing anything that needs to have some cleanup code run before the JVM terminates, you can have that done automatically as well. (This is also useful to avoid ugly stack traces when developing Spark applications and quitting the console before stopping your SparkContext.) Finally, since sbt build definitions are just Scala code, you can conditionalize these command sets, for example, to only load test fixtures sometimes.

Here’s what this sort of automation looks like in a real build definition, with an excerpt of the Build.scala file from a (not-yet-merged) feature branch on my sur-la-plaque project, showing how I added automation to the REPL for the analysis subproject:2

excerpt of Build.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def optionallySetupFixtures = {
  sys.env.get("SLP_FIXTURES_FROM") match {
    case Some(dir: String) => s"""
      |val data = app.processFiles(SLP.listFilesInDir("$dir"))
      |data.registerAsTable("trackpoints")
    """.stripMargin
    case _ => ""
  }
}

def analysisSettings = baseSettings ++ sparkSettings ++ breezeSettings ++ dispatchSettings ++ testSettings ++ Seq(
  initialCommands in console :=
    """
      |import org.apache.spark.SparkConf
      |import org.apache.spark.SparkContext
      |import org.apache.spark.rdd.RDD
      |import com.freevariable.surlaplaque.importer._
      |import com.freevariable.surlaplaque.data._
      |import com.freevariable.surlaplaque.app._
      |
      |val conf = new SparkConf().setMaster("local[8]").setAppName("console").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      |val sc = new SparkContext(conf)
      |val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      |val app = new SLP(sc)
      |import sqlContext._
      |
    """.stripMargin + optionallySetupFixtures,
  cleanupCommands in console := "app.stop"
)

First, I’ve declared a simple optionallySetupFixtures function that generates code to load test data and register it with Spark SQL, but only if SLP_FIXTURES_FROM is set in the environment with the name of a directory containing activity files. The analysisSettings function returns a Seq of settings for the analysis subproject, first combining common settings, test settings, and library-specific settings for its dependencies (these are all declared elsewhere in the file). To this combination of common settings, we then add

  1. an initialCommands setting to ensure that our REPL session imports Spark and sur-la-plaque libraries and sets up SparkContext and SQLContext instances, and
  2. a cleanupCommands setting to gracefully shut down the SparkContext when we exit the REPL (via the stop method in the SLP application class)

Note that the initialCommands setting is the result of appending the static settings (our imports and variable declarations) with the result of calling optionallySetupFixtures, which will either be code to load and register our data or nothing, depending on our environment.

This functionality makes it easy to develop custom REPL environments or just save a lot of time while interactively experimenting with new techniques in your project. Even better, the investment required is absolutely minimal compared to the payoff of not having to paste or type boilerplate code in to every REPL session.


  1. This feature is mentioned in the documentation and used by Apache Spark, so I was familiar with it, but — for whatever reason — I hadn’t thought to apply it to my own projects until recently. I’m mentioning it here in case you didn’t think of it either!

  2. sur-la-plaque is a collection of applications dedicated to making sense of bicycling activity data; you can read more about some of the tools it includes here. The code is currently structured in two sbt projects: one for analysis code that actually processes data, and one that provides a web-based viewer of analysis results.