As everyone knows, internet comments are pure, unadulterated evil.1 But it’s nonetheless nice when blogs give readers an easy way to provide feedback. I recently reworked my Octopress installation to allow readers to reply to my posts via Twitter or email; this post will show you how to set it up for yourself. We’ll be using Twitter’s Web intents API to make it happen.

Step 1: Configuration

If you haven’t already set your Twitter username in your Octopress _config.yml, do so now; here’s what mine looks like:

excerpted from _config.yml
1
2
3
# Twitter
twitter_user: willb
twitter_tweet_button: false

Step 2: Create a new partial layout

The next thing we’ll do is add a new partial layout that generates reply-via-Twitter links. Create source/_includes/post/feedback.html in your Octopress directory and give it the following contents:

source/_includes/post/feedback.html
1
2
3
4
5
6
7
 • You may
 {% if site.twitter_user and page.skip_twitter_feedback != true %}
    <script type="text/javascript" src="//platform.twitter.com/widgets.js"></script>
    {% capture tweet_text %}@{{site.twitter_user}} re: {{site.url}}{{page.url}}{% endcapture %}
  <a href="https://twitter.com/intent/tweet?text={{ tweet_text | uri_escape }}">reply to this post on Twitter</a> or
 {% endif %}
<a href="mailto:{{site.email}}">email the author</a>.

Note that this will always provide an email feedback link, targeting the site-wide email variable from your _config.yml.2 It will provide reply-via-twitter links if you’ve set the twitter_user configuration variable and haven’t set skip_twitter_feedback: true in your post’s YAML front matter. (I use this to eliminate redundancy for cases like this, in which I’ve asked for replies via Twitter in the body of the post.) When a reader clicks on the reply-via-twitter link, it will take them to Twitter3 with a prepopulated tweet:

Web Intent

Step 3: Add the new partial to your post template

Finally, edit your post template to ensure that it includes feedback.html in an appropriate place. This will obviously depend on your theme and your taste, but I’ve included my post template — which is based on octostrap3 and includes the feedback links directly following the category list — as an example; see line 19:

source/_layouts/post.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
---
layout: default
navbar: Blog
single: true
---

<div class="row">
  <div class="page-content {% unless page.sidebar == false or site.post_asides == empty and site.default_asides == empty %}col-md-9{% else %}col-md-12{% endunless %}" itemscope itemtype="http://schema.org/Blog">
    <meta itemprop="name" content="{{site.title}}" />
    <meta itemprop="description" content="{{site.description}}" />
    <meta itemprop="url" content="{{site.url}}" />
    <article class="hentry" role="article" itemprop="blogPost" itemscope itemtype="http://schema.org/BlogPosting">
      {% include article.html %}
      <footer>
        <p class="meta text-muted">
          {% include post/author.html %}
          {% include post/date.html %}{% if updated %}{{ updated }}{% else %}{{ time }}{% endif %}
          {% include post/categories.html %}
          {% include post/feedback.html %}
        </p>
        {% unless page.sharing == false %}
          {% include post/sharing.html %}
        {% endunless %}
        {% if page.previous.url or page.next.url %}
          <ul class="meta text-muted pager">
            {% if page.previous.url %}
            <li class="previous"><a href="{{page.previous.url}}" title="Previous Post: {{page.previous.title}}">&laquo; {{page.previous.title}}</a></li>
            {% endif %}
            {% if page.next.url %}
            <li class="next"><a href="{{page.next.url}}" title="Next Post: {{page.next.title}}">{{page.next.title}} &raquo;</a></li>
            {% endif %}
          </ul>
        {% endif %}
      </footer>
    </article>

    {% if site.disqus_short_name and page.comments == true %}
      <section>
        <h1>Comments</h1>
        <div id="disqus_thread" aria-live="polite">{% include post/disqus_thread.html %}</div>
      </section>
    {% endif %}
  </div>

  {% unless page.sidebar == false or site.post_asides == empty and site.default_asides == empty %}
  <aside class="sidebar col-md-3">
    {% if site.post_asides.size %}
      {% include_array post_asides %}
    {% else %}
      {% include_array default_asides %}
    {% endif %}
  </aside>
  {% endunless %}
</div>

Step 4: There is no Step 4

I hope this was helpful! If you have feedback on this post, I bet you can figure out where to find me.


  1. Only mostly kidding.

  2. If you haven’t sent this variable, you can replace {{site.email}} with your email address. You can also replace the whole mailto link with the results of some obfuscation technique like the Hivelogic Enkoder if you still care about concealing your email address from spambots.

  3. By default, it will open this in a pop-up window unless your users are blocking Javascript from Twitter. If you’re interested in avoiding pulling down scripts from external sites, eliminating the script link to platform.twitter.com/widgets.js will only disable the pop-up window, not affect reply-via-tweet functionality in general. (You can, of course, use well-known techniques to open the link in a pop-up window without the external script.)

One of my side projects this year has been using Apache Spark to make sense of my bike power meter data. There are a few well-understood approaches to bike power data modeling and analysis, but the domain has been underserved by traditional machine learning approaches, and I wanted to see if I could quickly develop some novel techniques. My experiments so far are available as open-source code and they have been quite successful in two ways: not only have I mined some useful information from my rides, but I’ve also demonstrated that Spark is responsive enough to be a good substitute for R in exploratory use cases.

However, the code I’ve produced up to this point has really been more focused on experimenting with analytic techniques rather than on scalable implementations. I’ve valued obvious correctness and simplicity over performance, in some cases making implementation decisions that were obviously suboptimal in the interest of having simple implementations. (The blessing and the curse of declarative programming is that you’re insulated, to some extent, from how your application actually executes!)

Since publishing the code, giving talks about it, and demoing it, I’ve decided that the basic approach is fairly sensible. I’ve also seen a lot of interest from teammates and other cyclists who’d like to analyze their own data, and I’d like to eventually make the code available as a public-facing service — which means that it’s time to adjust my tradeoffs to favor efficiency a bit more.

In this post, we’ll look at several minor changes (and one major change) that improved my code’s performance, identifying the common issues that each uncovered. This certainly won’t be an exhaustive catalog of Spark optimization tips, and I’m not claiming that the code I wound up with is perfect, but keeping these issues in mind should improve your Spark apps. We’ll cover using broadcast variables to distribute data to workers, using caching sensibly, being sensitive to data shuffles, and designing applications to take advantage of laziness.

Background

The particular code we’re looking at is from one of the the applications I discussed at Spark Summit this year.1 The question this app aims to answer is “given historical ride data, where am I likely to get my best mean n-second power?” Its basic approach is:

  1. load activity data from disk, transforming it into a collection of Trackpoint records,
  2. optimize a k-means clustering (with a relatively large k) for the spatial locations of each Trackpoint,
  3. find the mean power for each overlapping n-second window of each activity,
  4. for each pair (i, j) of spatial clusters, reject all but the best n-second efforts starting in cluster i and ending in cluster j, and
  5. finally, plot the trackpoints from the best remaining 20 efforts

The Trackpoint structure is designed to capture all the metrics2 in each sample recorded in a Garmin TCX file; it is a Scala case class with timestamp, coordinates, altitude, wattage, and activity identity. Once we’ve processed our input files into Trackpoint records, we’ll group these records by activity, giving us a collection of pairs that encode a mapping from activity names to sequences of trackpoints, like this:

Schematic of per-activity trackpoint collections

Since dealing with windowed data is common to several of my bike data applications, I had factored it out into a common trait:

excerpted from common.scalalink
1
2
3
4
5
6
7
8
9
10
11
trait ActivitySliding {
  import org.apache.spark.rdd.RDD
  import com.freevariable.surlaplaque.data.Trackpoint

  def windowsForActivities[U](data: RDD[Trackpoint], period: Int, xform: (Trackpoint => U) = identity _) = {
    val pairs = data.groupBy((tp:Trackpoint) => tp.activity.getOrElse("UNKNOWN"))
    pairs.flatMap({case (activity:String, stp:Seq[Trackpoint]) => (stp sliding period).zipWithIndex.map {case (s,i) => ((activity, i), s.map(xform))}})
  }

  def identity(tp: Trackpoint) = tp
}

The windowsForActivities function maps the data into overlapping period-sample windows. Since it’s not possible, in general, to efficiently calculate sliding windows over RDDs3, we operate on materialized sequences of trackpoints, and return pairs representing a mapping from pairs of activity identifiers and sample offsets to sequences representing sample windows, like this:

Diagram of sample windows

The windowsForActivities function also takes an optional transformation function argument (xform), which can be used to map Trackpoint records after splitting them into sliding windows. In my case, I use these to discard metrics data that isn’t relevant to a given application. Since our first approach to finding best efforts involves keeping every effort window around until we know which ones we’ll ultimately care about,4 going to a leaner trackpoint structure can save a lot of memory and improve performance. In the case of this application and my data, translating trackpoint records to simpler records (containing only latitude, longitude, and wattage) resulted in approximately a 2x speedup on my data.

In the rest of this post, we’ll look at the initial approach and some iterative refinements. For each refinement, we’ll look at the general Spark application performance issues it uncovered as well as the particular speedups I saw. (We’ll also look at a change that I thought would improve performance but didn’t.) First, though, a word of caution: I tried to do meaningful and repeatable timings, but was working on my personal workstation (hardly a controlled environment) didn’t do anything resembling a rigorous performance evaluation. Furthermore, your mileage may vary; be sure to thoroughly test improvements on your own applications and data!

The first pass

We’ll be looking at a small function in which PowerBestsApp spends most of its time; this is the function that finds the best n-sample efforts after filtering out all but the best effort starting and ending in a given pair of clusters. Initially, it looks like this:

excerpted from power_bests.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: KMeansModel) = {
  val windowedSamples = windowsForActivities(data, period, stripTrackpoints _).cache
  val clusterPairs = windowedSamples
    .map {case ((activity, offset), samples) => ((activity, offset), (closestCenter(samples.head.latlong, model), closestCenter(samples.last.latlong, model)))}
  val mmps = windowedSamples.map {case ((activity, offset), samples) => ((activity, offset), samples.map(_.watts).reduce(_ + _) / samples.size)}

  val top20 = mmps.join(clusterPairs)
   .map {case ((activity, offset), (watts, (headCluster, tailCluster))) => ((headCluster, tailCluster), (watts, (activity, offset)))}
   .reduceByKey ((a, b) => if (a._1 > b._1) a else b)
   .map {case ((headCluster, tailCluster), (watts, (activity, offset))) => (watts, (activity, offset))}
   .sortByKey(false)
   .take(20)

  app.context.parallelize(top20, app.context.defaultParallelism * 4)
   .map {case (watts, (activity, offset)) => ((activity, offset), watts)}
   .join (windowedSamples)
   .map {case ((activity, offset), (watts, samples)) => (watts, samples)}
   .collect
}

When we invoke bestsForPeriod, we already have an RDD of trackpoint records corresponding to every sample in every activity under consideration. We have also optimized a set of k cluster centers (I typically use k=128 for my data) and have the resulting model available. In the body of this function, we:

  • calculate and cache the overlapping sample windows for each activity, converting each Trackpoint record (with the stripTrackpoints function, declared elsewhere) into a minimal record consisting only of latitude, longitude, and wattage (line 2),
  • we declare clusterPairs, a RDD of pairs in which window identifiers (that is, activity-offset pairs) are the keys and pairs of cluster centers (one for the starting cluster and one for the ending cluster) are the values (lines 3—4),
  • we declare mmps, an RDD of pairs in which the keys are window identifiers and the values are mean wattages for the corresponding window of samples (line 5),
  • we then find the top twenty efforts, only considering the best effort starting and ending in a given pair of clusters:
    • first, we join mmps with clusterPairs, giving us an RDD mapping from window identifiers to tuples of mean wattages and start-end cluster pairs (line 7),
    • we transform this mapping so that it’s keyed by the start-end clusters of each window (line 8),
    • and reject all but the best effort for each start-end cluster pair (line 9),
    • we then re-key the resulting pairs by mean wattage (line 10),
    • and sort by mean wattage in descending (hence false) order (line 11)
    • now, up until this point, we haven’t actually done anything besides construct a directed acyclic graph of RDD dependencies, since we’ve exclusively invoked RDD transformations, which are lazy.5 However, in line 12, we’re going to force computation of the mean wattage-window identifier pairs and send the twenty best ones back to the driver program
  • we convert the collection of pairs back to an RDD (line 14; note that we’re assuming low default parallelism or possible expansion to consider more than 20 best efforts), and immediately transpose each pair to re-key by window identifier (line 15),
  • join this collection with the windows themselves (line 16), resulting in a map from window identifiers to tuples of mean wattages and the samples themselves6 that contains only our best 20 efforts, and
  • strip the window identifier from each tuple, leaving only pairs of wattages and windows (line 17) before firing off a Spark job to actually do the computations we described in lines 14—17, returning the results to the driver program (line 18)

This code runs quickly enough for prototyping, but it could be a lot faster. Experienced Spark programmers likely see several opportunities for improvement here; hopefully the changes we’re about to make will cover most of them.

Broadcasting: a simple win

The first and easiest change we can make is to ensure that we aren’t passing too much data in closure environments, since that data has to be serialized and shipped to workers (in my case, on a single multiprocessor workstation, but potentially across a cluster). Line 4 in our original function relied on having a KMeansModel available in the closure’s environment. Now, a k-means model for 128 cluster centers isn’t huge by any means, but it’s also not a trivial thing to have to serialize along with a relatively small closure. By passing the model in a Spark broadcast variable instead, we can reduce the cost of creating and executing the RDD transformations that depend on it:

excerpted from power_bests.scala
1
2
3
4
5
6
7
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: Broadcast[KMeansModel]) = {
  val windowedSamples = windowsForActivities(data, period, stripTrackpoints _).cache
  val clusterPairs = windowedSamples
    .map {case ((activity, offset), samples) => ((activity, offset), (closestCenter(samples.head.latlong, model.value), closestCenter(samples.last.latlong, model.value)))}

    // ... rest of function omitted
}

Note that all I did here was change bestsForPeriod to expect a Spark broadcast variable instead of a raw KMeansModel, and then pass model.value to the closestCenter function where it had previously expected a KMeansModel. This very simple change resulted in an approximately 1.1x speedup on my code. If you have large read-only data (whether bigger models or anything else) that needs to go to Spark workers with closure arguments, you might see even bigger improvements.

To create a broadcast variable, simply call the broadcast method of SparkContext with the value you wish to broadcast as a parameter:

Broadcast variable example
1
2
3
4
5
6
scala> val evenNumbers = (0 to 100000 by 2).toArray
evenNumbers: Array[Int] = Array(0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198, 200, 202, 204, 206, 208, 210, 212, 214, 216, 218, 220, 222, 224, 226, 228, 230, 232, 234, 236, 238, 240, 242, 244, 246, 248, 250, 252, 254, 256, 258, 260, 262, 264, 266, 268, 270, 272, 274, 276, 278, 280, 282, 284, 286, 288, 290, 292, 294, 296, 298, 300, 302, 304, 306, 308, 310, 312, 314, 316, 318, 320, 322, 324, 326, ...
scala> val broadcastEvens = sc.broadcast(evenNumbers)
14/09/09 14:45:40 INFO storage.MemoryStore: ensureFreeSpace(200048) called with curMem=0, maxMem=9239474995
14/09/09 14:45:40 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 195.4 KB, free 8.6 GB)
broadcastEvens: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

Eliminating counterproductive caching

One of the big benefits to using Spark over other distributed computing frameworks is that intermediate results can be cached in cluster memory. However, caching can be both a gift and a curse: it’s always possible to cache something, even something you use many times, whose impact on your memory usage far outweighs the cost of recomputing it. In our case, the largest collection we deal with is the windowedSamples RDD, which, depending on the window size, can be hundreds of times larger than the input data. Actually computing the windows is not particularly expensive, they aren’t used many times, and we use only a tiny fraction of the windows at the end of the function in any case. By eliminating the .cache call on windowedSamples, we acknowledge that it is cheaper to recompute the few windows we’ll need to examine more than once than it is to dedicate the memory necessary to keeping the windows around. This change resulted in an approximately 1.75x speedup on my code.

I hesitate to call this a simple fix, since it requires some insight into your application. But as we’ve already seen with the Trackpoint transformation, memory pressure can make a huge difference to Spark applications. Just because you can cache something doesn’t mean you should. (This is a lesson that users of tabled Prolog systems learn quickly: if you start by tabling — that is, memoizing — every relation in your database, you’ll have a fast, expressive environment until everything explodes and your computer becomes a pile of glowing sand.)

Being sensitive to shuffles

Recall that when Spark actually executes a job, it pipelines as many individual tasks as possible into stages, which can be executed on workers without coordinating or shuffling data.7 Ideally, stages will consist of several tasks to allow for high utilization; intuitively, we want workers to spend far more time computing results than coordinating with the driver. However, when RDD transformations change the keys in a collection of pairs (for example), we force Spark to coordinate between workers, shuffling RDD elements around in order to repartition them.

An interesting non-improvement

Since our application deals with some fairly large collections (windowedSamples), shuffling RDDs can be especially painful. However, because of the way we want to use the data, it’s hard to eliminate all of the shuffles in our method. So instead, we’ll examine the impact of the shuffle by looking at another version of bestsForPeriod that starts by fusing the two transformations on windowSamples (from lines 3—5 in our original excerpt) into one transformation:

excerpted from power_bests.scala with transformation fusion
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: Broadcast[KMeansModel]) = {
  val windowedSamples = windowsForActivities(data, period, stripTrackpoints _)
  val bests = windowedSamples.map {
    case ((activity, offset), samples)  =>
      (
        (closestCenter(samples.head.latlong, model.value), closestCenter(samples.last.latlong, model.value)),
        (samples.map(_.watts).reduce(_ + _) / samples.size, (activity, offset))
      )
  }

  val top20 = bests.reduceByKey ((a, b) => if (a._1 > b._1) a else b)
   .reduceByKey ((a, b) => if (a._1 > b._1) a else b)
   .map {case ((headCluster, tailCluster), (watts, (activity, offset))) => (watts, (activity, offset))}
   .sortByKey(false)
   .take(20)

  app.context.parallelize(top20, app.context.defaultParallelism * 4)
   .map {case (watts, (activity, offset)) => ((activity, offset), watts)}
   .join (windowedSamples)
   .map {case ((activity, offset), (watts, samples)) => (watts, samples)}
   .collect
}

This transformation fusion essentially eliminates one of the map transformations and also a join, giving us a collection of tuples of start-end cluster pairs and pairs of mean wattages and window identifiers. Interestingly, eliminating these transformations made the application slower! In fact, it ran approximately 19% slower than the previous version. However, this code affords more opportunities to eliminate re-keying and shuffles, and in so doing, we can once again achieve comparable performance to the previous version.

Here’s where the code wound up after several transformations:

excerpted from improved power_bests.scala with transformation fusion
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: Broadcast[KMeansModel]) = {
  val windowedSamples = windowsForActivities(data, period, stripTrackpoints _)

  val bests = windowedSamples.map {
    case ((activity, offset), samples)  => (
      (closestCenter(samples.head.latlong, model.value), closestCenter(samples.last.latlong, model.value)),
      ((activity, offset), samples.map(_.watts).reduce(_ + _) / samples.size)
    )
  }.cache

  val top20 = bests.reduceByKey ((a, b) => if (a._2 > b._2) a else b)
   .map { case ((_, _), keep) => keep }
   .takeOrdered(20)(Ordering.by[((String, Int), Double), Double] { case ((_, _), watts) => -watts})

  app.context.parallelize(top20, app.context.defaultParallelism * 4)
   .join (windowedSamples)
   .map {case ((activity, offset), (watts, samples)) => (watts, samples)}
   .collect
}

Note that I’ve eliminated a lot of tuple transposition and other unnecessary re-keyings. In particular, one of the biggest wins was to use the takeOrdered method with an explicit Ordering to avoid re-keying tuples on mean wattages in order to use take(n). (There are still some tuple re-keyings that could be eliminated; in particular the map on line 19 could probably be handled more efficiently in the driver in many cases, but we’re talking about marginal gains at this point.)

After these changes, the fused-transformation implementation performed almost exactly as well in terms of wall-clock time as the previous implementation. Since the prior implementation was slightly higher-level, I decided I’d prefer it as a basis for future efforts, but I kept the takeOrdered instead of take(n), which resulted in a small (~5%) speedup.8

Lazily materializing sample windows

The biggest improvement I made was the most invasive, but it addressed the most obvious deficiency of the initial implementation: specifically, it’s terribly wasteful to store and shuffle sample windows that are relatively huge in the aggregate but cheap to compute and unlikely to be needed.

Instead of keeping a collection of sample windows keyed by window identifiers, I instead elected to summarize efforts by recording Effort structures containing mean wattage, activity name, and starting and ending timestamps. In order to do this, I generalized the sliding-window behavior from my ActivitySliding trait, allowing clients to specify their own transformations for individual windows:

excerpted from common.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
trait ActivitySliding {
  import org.apache.spark.rdd.RDD
  import com.freevariable.surlaplaque.data.Trackpoint

  def windowsForActivities[U](data: RDD[Trackpoint], period: Int, xform: (Trackpoint => U) = identity _) = {
    applyWindowed(data, period, {case (activity, samples, offset) => ((activity, offset), samples.map(xform))})
  }

  def applyWindowed[U: ClassTag](data: RDD[Trackpoint], period: Int, xform: ((String, Seq[Trackpoint], Int) => U)) = {
    val pairs = data.groupBy((tp:Trackpoint) => tp.activity.getOrElse("UNKNOWN"))
    pairs.flatMap {
      case (activity: String, stp:Seq[Trackpoint]) =>
        (stp sliding period).zipWithIndex.map { case (s, i) => xform(activity, s, i) }
    }
  }

  def applyWindowedNoZip[U: ClassTag](data: RDD[Trackpoint], period: Int, xform: ((String, Seq[Trackpoint]) => U)) = {
    val pairs = data.groupBy((tp:Trackpoint) => tp.activity.getOrElse("UNKNOWN"))
    pairs.flatMap {
      case (activity: String, stp:Seq[Trackpoint]) => (stp sliding period).map { xform(activity, _) }
    }
  }

  def identity(tp: Trackpoint) = tp
}

I was then able to use the applyWindowedNoZip function to generate a collection of tuples in which the keys were start-end cluster pairs and the values were Effort structures. I still needed to filter the whole dataset to find the few windows I cared about after identifying the best efforts for each pair of clusters, but that was pretty straightforward. However, I now needed to make sure I cached that RDD of every trackpoint before entering bestsForPeriod in order to avoid reloading them from disk multiple times! Here’s what the final code looked like:

excerpted from power_bests.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: Broadcast[KMeansModel]) = {
  val clusteredMMPs = applyWindowedNoZip(data, period, {
      case (activity:String, samples:Seq[Trackpoint]) =>
        (
          (closestCenter(samples.head.latlong, model.value), closestCenter(samples.last.latlong, model.value)),
          Effort(samples.map(_.watts).reduce(_ + _) / samples.size, activity, samples.head.timestamp, samples.last.timestamp)
        )
    })

  clusteredMMPs
   .reduceByKey ((a, b) => if (a.mmp > b.mmp) a else b)
   .takeOrdered(20)(Ordering.by[((Int, Int), Effort), Double] { case (_, e:Effort) => -e.mmp })
   .map {
     case (_, e: Effort) => (
       e.mmp,
       data.filter {
         case tp: Trackpoint => tp.activity.getOrElse("UNKNOWN") == e.activity && tp.timestamp <= e.endTimestamp && tp.timestamp >= e.startTimestamp
       }.collect
     )
   }
}

This code is not particularly more difficult to read than the code we started with, but it is substantially faster; it is over 3.7x faster than the prior version and more than 14x faster than my initial prototype (not shown in this post). There are certainly further improvements to make (even some fairly obvious ones), but an important lesson from this exercise is that Spark not only makes it easy to experiment with novel analytic approaches, it also makes it easy to experiment with improved implementations that still retain their declarative elegance.

Conclusions

This post has shown several simple, orthogonal changes to a Spark application that readily compose together for a huge performance improvement. The changes touched on several broader themes you’ll want to keep in mind as you develop your own Spark applications:

  • Avoid shipping large data in closure environments, but instead prefer broadcast variables;
  • Don’t cache indiscriminately, but be sure that the cost of added memory pressure is worth the savings of avoiding recomputation;
  • Understand Spark’s execution model in order to help Spark help you; and
  • Don’t be featured on the next episode of “Big Data Hoarders,” but rather structure your application to embrace laziness and keep only what you need!

Thanks for reading! If you have further tips, you can reply to this post via Twitter.


  1. See this post for a high-level overview of the technique used by this app, along with some discussion of how the data guided the choices I made.

  2. Well, almost all of the metrics. It doesn’t capture heart rate or cadence because I’m not doing anything with those yet, and it doesn’t include speed sensor data because that can be derived from the coordinates of pairs of trackpoints.

  3. Xiangrui Meng proposed an implementation of sliding windows for RDDs earlier this year; the discussion in that link provides some good background on why exposing sliding-window functionality might be a bad idea. (Briefly, the issue is that cross-partition windows are necessarily very expensive, so as the window size approaches — or surpasses — the number of elements in a partition, performance tanks.) A sliding window implementation is currently available as an internal API for MLlib algorithms, though.

  4. This is the most obvious performance pitfall of this approach, and it was obvious from the beginning. Having the windows around, though, made experimenting with my data much easier. Spoiler alert: we’ll look at what happens when we eliminate this liability last.

  5. Advanced Spark users will also note that this statement involves a small lie, since the zipWithIndex method used in windowsForActivities needs to launch a Spark job to appropriately number records across partitions.

  6. The samples themselves are important since we want to plot the paths on which the best efforts occurred.

  7. A great introductory explanation of how Spark executes jobs is Aaron Davidson’s talk from Spark Summit 2014.

  8. Note that these different approaches may have resulted in similar wall-clock times but could have higher or lower utilization — and thus different ultimate performance characteristics — depending on your environment. Be sure to test possible improvements with your code and data!

Consider the collection1 of all contiguous subsequences of a sequence. If we’re talking about a stream of n observations, this could be the multiset of windows containing every possible window over these observations: 1 window of n samples, 2 windows of n-1 samples, 3 windows of n-2 samples, …, and n “windows” of 1 sample each. If we’re talking about a string of symbols, e.g., abcde, then we’d be talking about the following set of substrings:

{
  abcde, 
  abcd, bcde, 
  abc, bcd, cde,
  ab, bc, cd, de,
  a, b, c, d, e
}

Given a sequence of n elements, there will be T(n) subsequences in such a collection, where T(n) is the nth triangular number. If we must store each subsequence without using some representation where two subsequences can share elements they have in common (e.g., if we were to store each as a distinct C-style array), the space requirements to do so explode quickly:

space complexity plot

Anyway, I’m curious if this concept has a name. I’ve been thinking of it as a “power substring,” analogously to a powerset. If you know of a name for it, please let me know!


  1. I find myself thinking of subsequence identity as depending upon its position in the parent sequence, and thus thinking of this collection as a set even if it could contain multiple subsequences with identical elements.

If you’re like me, you often find yourself pasting transcripts into sbt console sessions in order to interactively test out new app functionality. A lot of times, these transcripts have a great deal of boilerplate and can be dramatically simplified. Fortunately, sbt provides a facility to let you specify what commands should run at the beginning of a console session: the project-specific initialCommands setting.1 sbt also provides a cleanupCommands setting to specify commands to run when your REPL exits, so if you’re testing anything that needs to have some cleanup code run before the JVM terminates, you can have that done automatically as well. (This is also useful to avoid ugly stack traces when developing Spark applications and quitting the console before stopping your SparkContext.) Finally, since sbt build definitions are just Scala code, you can conditionalize these command sets, for example, to only load test fixtures sometimes.

Here’s what this sort of automation looks like in a real build definition, with an excerpt of the Build.scala file from a (not-yet-merged) feature branch on my sur-la-plaque project, showing how I added automation to the REPL for the analysis subproject:2

excerpt of Build.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def optionallySetupFixtures = {
  sys.env.get("SLP_FIXTURES_FROM") match {
    case Some(dir: String) => s"""
      |val data = app.processFiles(SLP.listFilesInDir("$dir"))
      |data.registerAsTable("trackpoints")
    """.stripMargin
    case _ => ""
  }
}

def analysisSettings = baseSettings ++ sparkSettings ++ breezeSettings ++ dispatchSettings ++ testSettings ++ Seq(
  initialCommands in console :=
    """
      |import org.apache.spark.SparkConf
      |import org.apache.spark.SparkContext
      |import org.apache.spark.rdd.RDD
      |import com.freevariable.surlaplaque.importer._
      |import com.freevariable.surlaplaque.data._
      |import com.freevariable.surlaplaque.app._
      |
      |val conf = new SparkConf().setMaster("local[8]").setAppName("console").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      |val sc = new SparkContext(conf)
      |val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      |val app = new SLP(sc)
      |import sqlContext._
      |
    """.stripMargin + optionallySetupFixtures,
  cleanupCommands in console := "app.stop"
)

First, I’ve declared a simple optionallySetupFixtures function that generates code to load test data and register it with Spark SQL, but only if SLP_FIXTURES_FROM is set in the environment with the name of a directory containing activity files. The analysisSettings function returns a Seq of settings for the analysis subproject, first combining common settings, test settings, and library-specific settings for its dependencies (these are all declared elsewhere in the file). To this combination of common settings, we then add

  1. an initialCommands setting to ensure that our REPL session imports Spark and sur-la-plaque libraries and sets up SparkContext and SQLContext instances, and
  2. a cleanupCommands setting to gracefully shut down the SparkContext when we exit the REPL (via the stop method in the SLP application class)

Note that the initialCommands setting is the result of appending the static settings (our imports and variable declarations) with the result of calling optionallySetupFixtures, which will either be code to load and register our data or nothing, depending on our environment.

This functionality makes it easy to develop custom REPL environments or just save a lot of time while interactively experimenting with new techniques in your project. Even better, the investment required is absolutely minimal compared to the payoff of not having to paste or type boilerplate code in to every REPL session.


  1. This feature is mentioned in the documentation and used by Apache Spark, so I was familiar with it, but — for whatever reason — I hadn’t thought to apply it to my own projects until recently. I’m mentioning it here in case you didn’t think of it either!

  2. sur-la-plaque is a collection of applications dedicated to making sense of bicycling activity data; you can read more about some of the tools it includes here. The code is currently structured in two sbt projects: one for analysis code that actually processes data, and one that provides a web-based viewer of analysis results.

In earlier posts we introduced the concepts of type widening and type translation, discussed support for these in existing database systems, and presented a general approach to implementing type widening. In this post, we’ll extend our simple interpreter with support for type translations.

Adding functions to SimpleInterpreter

While we could implement type translation for AddExpression, we’d want to do so in such a way as to resolve the ambiguity inherent in adding a StringType and an IntType: should we coerce the StringType to an integer value or convert the IntType to a string representation? Put another way, should "12" + 3 evaluate to "123" or 15?1 While this is an interesting aesthetic question, we’ll not treat it further in this post.

Instead, we’ll extend our simple interpreter trait with support for functions:

LessSimpleInterpreter.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
object LessSimpleInterpreter extends SimpleInterpreter {

  class BinaryFunction[L,R,Result](val lhs: Expression[L], val rhs: Expression[R], val f: ((L, R) => Result))
      extends Expression[Result] {
    type left = L
    type right = R
    type result = Result

    def eval: Value[Result] = {
      val lv = lhs.eval
      val rv = rhs.eval
      new Value[Result](f(lv.get, rv.get)) {}
    }
  }

  object BinaryFunction {
    def apply[L,R,Result](lhs: Expression[L], rhs: Expression[R], f: ((L, R) => Result)) =
      new BinaryFunction(lhs, rhs, f)

    def unapply[L,R,Result](bf: BinaryFunction[L,R,Result]):
        Option[(Expression[L],Expression[R],((L, R) => Result))] =
      Some((bf.lhs, bf.rhs, bf.f))
  }
}

Notice that we don’t declare BinaryFunction as a case class, since we might want to declare other case classes that inherit from it.2 We could thus use BinaryFunction directly, like this:

BinaryFunction example
1
2
3
4
5
6
7
8
scala> import LessSimpleInterpreter._
import LessSimpleInterpreter._

scala> val max = BinaryFunction(IntValue(4), IntValue(5), ((a:Int, b:Int) => if (a > b) a else b))
max: LessSimpleInterpreter.BinaryFunction[Int,Int,Int] = LessSimpleInterpreter$BinaryFunction@301ec38b

scala> max.eval.get
res0: Int = 5

Alternatively, we could implement subclasses of BinaryFunction with baked-in function parameters, as follows:

MaxFunction.scala
1
2
3
4
5
import LessSimpleInterpreter._

class MaxFunction[T, Result](lhs: Expression[T], rhs: Expression[T])
    (implicit ev: T => Ordered[T])
    extends BinaryFunction(lhs, rhs, ((a:T, b:T) => if (a > b) a else b)) { }

Note that, in this case (with no type coercion), we use a single class representing both a function and a function application. We’ll be changing that shortly.

Combining widening and function application

We’ll first produce an interpreter that combines both type widening and function application, using our “unzipped” widening interpreter, which finds widenings one parameter at a time, as a starting point.3 In the following example, we’ve created separate classes representing function definitions (BinaryFunction) and function application expressions (BinaryFunctionAppl). Each function definition has formal parameter types (that is, the parameter types it expects) and a result type; each function application has actual parameter types, which are the types of the expressions supplied as actual parameters. In a function application, we denote the formal parameter types as Lf and Rf and the actual parameter types as La and Ra. (As usual, these types can be equal, but they need not be.) Just like Apache Hive does when automatically wrapping UDF functions in the GenericUDF interface, we define the formal parameter types based on the formal parameter types of the underlying function implementation.

WideningFunctionInterpreter.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
object WideningFunctionInterpreter extends UnzippedWideningInterpreter {
  class BinaryFunction[L <: Type, R <: Type, Result <: Type]
      (fun: (L#nativeType, R#nativeType) => Result#nativeType) {

    def apply(lhs: L#nativeType, rhs: R#nativeType): Result#nativeType = fun(lhs, rhs)
  }

  class BinaryFunctionAppl[La <: Type, Ra <: Type, Lf <: Type, Rf <: Type, Result <: Type]
      (val lhs: Expression[La], val rhs: Expression[Ra], val f: BinaryFunction[Lf, Rf, Result])
      (implicit lconv: Value[La] => Value[Lf], rconv: Value[Ra] => Value[Rf])
      extends Expression[Result] {
    type left = Lf
    type right = Rf
    type result = Result

    def eval: Value[Result] = {
      val lv = lconv(lhs.eval)
      val rv = rconv(rhs.eval)
      new Value[Result](f(lv.get, rv.get)) {}
    }
  }
}

In order to have a valid function application, we need two witness functions (here called lconv and rconv) to convert values of the actual parameter types La and Ra to values of the formal parameter types Lf and Rf. We can see how this works in practice by defining a maximum function over DoubleType values and applying it to IntType values:

WideningFunctionInterpreter example
1
2
3
4
5
6
7
8
9
10
11
scala> import WideningFunctionInterpreter._
import WideningFunctionInterpreter._

scala> val max = new BinaryFunction[DoubleType,DoubleType,DoubleType]((x: Double, y: Double) => if (x > y) x else y)
max: WideningFunctionInterpreter.BinaryFunction[WideningFunctionInterpreter.DoubleType,WideningFunctionInterpreter.DoubleType,WideningFunctionInterpreter.DoubleType] = WideningFunctionInterpreter$BinaryFunction@72a7aa4f

scala> val appl = new BinaryFunctionAppl(IntValue(5), IntValue(7), max)
appl: WideningFunctionInterpreter.BinaryFunctionAppl[WideningFunctionInterpreter.IntType,WideningFunctionInterpreter.IntType,WideningFunctionInterpreter.DoubleType,WideningFunctionInterpreter.DoubleType,WideningFunctionInterpreter.DoubleType] = WideningFunctionInterpreter$BinaryFunctionAppl@1a536164

scala> val result = appl.eval.get
result: WideningFunctionInterpreter.DoubleType#nativeType = 7.0

Note that the problem of widening actuals to formals is even more straightforward than finding the least upper bound type of two operands in an addition, since we know what the formals are expected to be and thus know immediately whether we have relevant widenings or not.

Adding translation

The final enhancement to our simple interpreter is to add type translation. Since we can treat type widening as a limited case of type translation, our approach will handle both. For the sake of a straightforward example, we’ll simply allow converting from string values to doubles (à la Hive and other untyped database systems) as well as converting from a value to one of a wider type.

We’ll define a trait called and declare witness instances A ↪ B if there is a way to translate from a Value[A] to a Value[B]. Since we can’t statically guarantee that type translations will succeed in general, A ↪ B will be implemented as a partial function from Value[A] to Value[B].

TranslatingFunctionInterpreter.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import scala.util.{Try, Success}

object TranslatingFunctionInterpreter extends UnzippedWideningInterpreter {
  trait [A <: Type, B <: Type] extends PartialFunction[Value[A], Value[B]]

  implicit def wideningTranslation[A <: Type, B <: Type]
       (implicit ev: A#nativeType => B#nativeType): [A, B] = new ↪[A, B] {
     def apply(a: Value[A]): Value[B] = new Value[B](ev(a.get))
     def isDefinedAt(a: Value[A]) = true
  }

  implicit object StringToDouble extends ↪[StringType,DoubleType] {
    def apply(s: Value[StringType]) = {
      DoubleValue(Try(s.get.toDouble).toOption.get)
    }

    def isDefinedAt(s: Value[StringType]) = {
      Try(s.get.toDouble) match {
        case Success(_) => true
        case _ => false
      }
    }
  }

  class BinaryFunction[L <: Type, R <: Type, Result <: Type]
      (fun: (L#nativeType, R#nativeType) => Result#nativeType) {

    def apply(lhs: L#nativeType, rhs: R#nativeType): Result#nativeType = fun(lhs, rhs)
  }

  class BinaryFunctionAppl[La <: Type, Ra <: Type, Lf <: Type, Rf <: Type, Result <: Type]
      (val lhs: Expression[La], val rhs: Expression[Ra], val f: BinaryFunction[Lf, Rf, Result])
      (implicit lconv: La  Lf, rconv: Ra  Rf)
      extends Expression[Result] {
    type left = Lf
    type right = Rf
    type result = Result

    def eval: Value[Result] = {
      val lv = lconv(lhs.eval)
      val rv = rconv(rhs.eval)
      new Value[Result](f(lv.get, rv.get)) {}
    }
  }
}

Once we’ve implemented the partial functions to convert strings to doubles, our interpreter is very similar to the type-widening interpreter. We can see that it works by attempting to take the maximum of a double and a string representation of a double:

TranslatingFunctionInterpreter example
1
2
3
4
5
6
7
8
9
10
11
scala> import TranslatingFunctionInterpreter._
import TranslatingFunctionInterpreter._

scala> val max = new BinaryFunction[DoubleType,DoubleType,DoubleType]((x: Double, y: Double) => if (x > y) x else y)
max: TranslatingFunctionInterpreter.BinaryFunction[TranslatingFunctionInterpreter.DoubleType,TranslatingFunctionInterpreter.DoubleType,TranslatingFunctionInterpreter.DoubleType] = TranslatingFunctionInterpreter$BinaryFunction@4ab550d5

scala> val appl = new BinaryFunctionAppl(DoubleValue(5.0), StringValue("7.0"), max)
appl: TranslatingFunctionInterpreter.BinaryFunctionAppl[TranslatingFunctionInterpreter.DoubleType,TranslatingFunctionInterpreter.StringType,TranslatingFunctionInterpreter.DoubleType,TranslatingFunctionInterpreter.DoubleType,TranslatingFunctionInterpreter.DoubleType] = TranslatingFunctionInterpreter$BinaryFunctionAppl@6e1b9411

scala> appl.eval.get
res0: TranslatingFunctionInterpreter.DoubleType#nativeType = 7.0

In the remainder of this post, we’ll sketch a couple of simple extensions to this basic approach.

Dealing with failures

Note that our interpreter will crash if asked to evaluate a function application for which type translation fails. Failure is probably fine in these cases for general-purpose programming languages (although we’d probably want to model failure in the interpreted language or otherwise do something nicer than terminating with a Scala MatchError because a partial function isn’t defined for its input). However, if we’re interested in emulating database query languages, we have another option for dealing with translation failures: simply return a null value.

Since most database query language functions — both built-in and user-defined — must handle null values appropriately, this wouldn’t necessarily complicate function implementations any further. It would require some tweaks to the way we’ve ascribed types to this program; in particular, translation functions and function applications would have to return either a value of the translated type or null (if translation failed).4

Supporting polymorphism

One interesting remaining question is this: can we extend our interpreter to allow encoding simple generic functions? By “simple” generic functions, I mean those whose parameter types may consist of either concrete types or type variables, but whose output type is either not a type variable or is the same as one of its input types. As a concrete example, let’s say we wanted a function pow[X](base: X, exp: X): X where X could be either a double-precision floating-point value or an arbitrary-precision decimal — but, in either case, the result type would be the same as the actual parameter types.

Assuming the existence of appropriate object-language encodings for type constraints, subtyping, and substitutability relations between interpreter types, we could proceed via iterative application of widening and translation until we reached a fixed point. (This process would terminate so long as the widening and translation relations imposed a complete partial order on the set of interpreted-language types.)

However we’d likely not need such generality to implement polymorphic functions in a query language. Apache Hive, for example, allows coercing strings to either DECIMAL or DOUBLE values. So a simple approach would be to find the narrowest of those two types that string arguments could be coerced to (call this type TLub) and then widen all other arguments of type X to TLub.5

A parting thought

The interpreters presented in these posts took care to get things implemented safely (OK, as safely as practical) in Scala’s type system. There are almost certainly better ways to statically capture the safety properties we might care about. However, if we were in a more dynamic context — such as in an untyped language, or in a trivially-typed fragment of a Scala program6 — many of these approaches would admit terser implementations.


  1. Some related questions: should "12" + 3 evaluate to the same thing as 12 + "3"? Should both be legal? Java allows the former (evaluating it as string concatenation by invoking .toString on an automatically-boxed 3), but not the latter.

  2. We mimic some (but not all) case class functionality here. We have declared an apply method to the companion object, to allow constructing BinaryFunction instances without new, and an unapply method to allow matching and deconstructing BinaryFunction instances.

  3. See this post for more details on this approach.

  4. Given NullType extending Type, we’d have a few options: we could return an instance of Scala’s Either[A <: Value[_], Value[NullType]]; we could convert all interpreter eval methods to return an Option[Value[_]]; we could relax static typing on function return values; etc.

  5. As a further practical consideration, many of Hive’s numeric functions that accept either DECIMAL or DOUBLE values only operate on double-precision values and thus narrow DECIMAL arguments internally at evaluation time.

  6. Consider, for example, representing all interpreter values as Option[Any] and using explicit casts.

In this installment of our series on type coercions, we’re going to introduce a way to support type widening in a language interpreter. We’ll present a general approach based on semilattices of types and a particular implementation of this approach that uses a straightforward encoding in Scala’s type system. We’ll work from a simple interpreter to allow for a clear exposition, but our general techniques will be applicable to more involved languages as well.

Widening functions

A widening function maps values from some type T to a wider type U. We can implement a trivial but generic type-widening method based on the partial ordering of types encoded in Scala’s type system:

Widening example
1
2
3
4
5
6
7
8
9
10
scala> def widen[T,U](t: T)(implicit w: T => U): U = w(t)

scala> val longFive = widen[Int,Long](5)
longFive: Long = 5

scala> val doubleFive = widen[Int,Double](5)
doubleFive: Double = 5.0

scala> val arbitraryFive = widen[Int,BigDecimal](5)
arbitraryFive: BigDecimal = 5

Invoking widen[A,B] on an argument of type A will succeed if there is a witness object for A => B. By default, we’ll be able to see the predefined widenings in Scala, including reflexive widenings. Note that there are no implicitly defined narrowings, though:

Exploring witness objects for predefined widenings
1
2
3
4
5
6
7
8
9
10
scala> implicitly[Int => Double]
res0: Int => Double = <function1>

scala> implicitly[Int => Int]
res1: Int => Int = <function1>

scala> implicitly[Double => Int]
<console>:8: error: No implicit view available from Double => Int.
              implicitly[Double => Int]
                        ^

It’s important to note that we could declare other witnesses to A => B for other A and B types and have them in scope; we aren’t constrained by Scala’s predefined definitions or by static relationships between implementation types. (We’ll come back to this point later, when we’re thinking about how to model types in the interpreted language.)

A simply-typed interpreter

We’ll start with a simple interpreter with four kinds of values (integers, doubles, strings, and nulls) and one kind of expression representing numeric addition or string concatenation, depending on the types of its operands. It is a stretch to call the language embodied in this interpreter “typed” (since it has only literal values and expressions but no variables). However, because of the way we’ve encoded the interpreter in Scala, it is impossible to express programs with runtime errors. In particular, it is only possible to create an AddExpression with two arguments that evaluate to values of the same type.

SimpleInterpreter.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
trait SimpleInterpreter {

  trait Addable[T] {
    def plus(self: T, other: T): Value[T]
  }

  implicit object IntAddable extends Addable[Int] {
    def plus(self: Int, other: Int) = IntValue(self + other)
  }

  implicit object DoubleAddable extends Addable[Double] {
    def plus(self: Double, other: Double) = DoubleValue(self + other)
  }

  implicit object StringAddable extends Addable[String] {
    def plus(self: String, other: String) = StringValue(self + other)
  }

  abstract class Expression[T] {
    def eval: Value[T]
  }

  abstract class Value[T](v: T) extends Expression[T] {
    def eval: Value[T] = this
    def get: T = v
  }

  case class IntValue(v: Int) extends Value(v) {}
  case class DoubleValue(v: Double) extends Value(v) {}
  case class StringValue(v: String) extends Value(v) {}
  case object NullValue extends Value(null) {}

  case class AddExpression[T](lhs: Expression[T], rhs: Expression[T])(implicit ev: Addable[T]) extends Expression[T] {
    def eval: Value[T] = {
      val lv = lhs.eval
      val rv = rhs.eval
      ev.plus(lv.get, rv.get)
    }
  }
}

object SimpleInterpreter extends SimpleInterpreter { }

Adding widening

If we have an expression of the form t1 • t2, where the left-hand side is of type T1 and the right-hand side is of type T2, we will be able to convert this to an expression in which both operands have the same type if the following conditions are met:

  1. There must exist some type U such that T1UT2U, and
  2. There must exist widening functions with the signatures T1 ⇒ U and T2 ⇒ U.

Finding U is simply finding the least upper bound of T1 and T2 on a semilattice of types. Once we have this least upper bound, if we also have appropriate widening functions, we can convert both t1 and t2 to values of the same type. In the following code, we extend our simple interpreter by modeling interpreter types in Scala, making types properties of values, and adding widening conversions to AddExpression by explicitly encoding the partial ordering of types — in this case, based on a relationship between the nativeType type member of each Type class. (We’re doing widening statically through Scala’s type system in this case, but there’s no reason why we couldn’t take a similar approach dynamically, handling errors by raising exceptions at runtime.)

WideningInterpreter.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
object WideningInterpreter {
  import scala.language.implicitConversions

  sealed abstract class Type {
    type nativeType <: Any
  }
  class IntType extends Type {
    override type nativeType = Int
  }
  class DoubleType extends Type {
    override type nativeType = Double
  }
  class StringType extends Type {
    override type nativeType = String
  }

  abstract class Expression[K <: Type] {
    def eval: Value[K]
  }

  class Value[K <: Type](v: K#nativeType) extends Expression[K] {
    def eval: Value[K] = this
    def get: K#nativeType = v
  }

  case class IntValue(v: Int) extends Value[IntType](v) {}
  case class DoubleValue(v: Double) extends Value[DoubleType](v) {}
  case class StringValue(v: String) extends Value[StringType](v) {}

  sealed trait Addable[K <: Type] {
    def plus(self: K#nativeType, other: K#nativeType): Value[K]
  }

  implicit object IntAddable extends Addable[IntType] {
    def plus(self: Int, other: Int) = IntValue(self + other)
  }

  implicit object DoubleAddable extends Addable[DoubleType] {
    def plus(self: Double, other: Double) = DoubleValue(self + other)
  }

  implicit object StringAddable extends Addable[StringType] {
    def plus(self: String, other: String) = StringValue(self + other)
  }

  // We need some way to constrain our generic widening operators so
  // that an expression with identical operand types won't have an
  // ambiguous implicit argument.  One way to do this is to make sure
  // that one of the widening functions will only apply if the arguments
  // are of different types.  

  // These type inequality instances are taken from an answer Miles Sabin 
  // gave on StackOverflow:  http://stackoverflow.com/a/6944070

  trait =!=[A, B]
  implicit def neq[A, B] : A =!= B = null
  implicit def neqAmbig1[A] : A =!= A = null
  implicit def neqAmbig2[A] : A =!= A = null

  implicit def leftWiden[T <: Type, U <: Type](v1: Value[T], v2: Value[U])
      (implicit conv: (T#nativeType => U#nativeType)): (Value[U], Value[U]) =
    (new Value[U](conv(v1.get)), v2)

  implicit def rightWiden[T <: Type, U <: Type](v1: Value[U], v2: Value[T])
      (implicit neq: T =!= U,
                conv: (T#nativeType => U#nativeType)): (Value[U], Value[U]) =
    (v1, new Value[U](conv(v2.get)))

  case class AddExpression[T <: Type, U <: Type, V <: Type]
      (lhs: Expression[T], rhs: Expression[U])
      (implicit widen: (Value[T], Value[U]) => (Value[V], Value[V]), adder: Addable[V]) extends Expression[V] {
    def eval = {
      val lv = lhs.eval
      val rv = rhs.eval
      val args = widen(lv, rv)
      adder.plus(args._1.get, args._2.get)
    }
  }
}

In WideningInterpreter we extend AddExpression to allow it to have two potentially distinct argument types (Value[T] and Value[U]) and by requiring evidence of an implicit conversion from a pair of values with distinct types to a pair of values with the same type.1 We define two witness functions, leftWiden for the case in which the left element of the pair is narrower than the right, and rightWiden for the case in which the right element of the pair is narrower than the left. In both cases, we determine that a type T is narrower than another type U if Scala knows how to widen values of the representation type (Type#nativeType) of T to the representation type of U; this is the case if an implicit resolution exists for the conv argument.

The problem we might encounter is that, because our partial ordering is reflexive, if T and U are the same type, then there will be witnesses both for T#nativeType => U#nativeType and U#nativeType => T#nativeType. So if we were to have naive implementations of leftWiden and rightWiden that only depended on evidence of such a conversion, Scala would be unable to unambiguously resolve which would apply in the case of monomorphic AddExpressions. We resolve this problem by adding a test for type inequality (due to Miles Sabin) to the implicit argument list of rightWiden, so that it will not apply if the arguments are of the same type.2

Note that the partial ordering among interpreter types (IntType, DoubleType, and StringType) does not depend on Scala-level subtyping relationships between interpreter types. This is important because in a more realistic language we will want the flexibility to model data types independently of properties of our object-language implementation. Instead, we have a generic partial ordering in this example based on predefined relationships between representation types, and we could extend the partial ordering to other types by adding other instances of =>[A,B] for other types of interest.

For a small number of interpreter types, we could also explicitly encode the partial ordering, as in the example below:

A more explicit partial ordering encodinglink
1
2
3
4
5
implicit def intDoubleWiden(v1: Value[IntType], v2: Value[DoubleType]): (Value[DoubleType], Value[DoubleType]) =
  (DoubleValue(v1.get.toDouble), v2)

implicit def doubleIntWiden(v1: Value[DoubleType], v2: Value[IntType]): (Value[DoubleType], Value[DoubleType]) =
  (v1, DoubleValue(v2.get.toDouble))

Since in this example we have a total ordering among types, we can also easily widen one argument at a time by adding a witness object for the least upper bound of T and U,3 as in the example below:

A partial ordering encoding that finds appropriate widenings one argument at a timelink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
implicit def widen[T <: Type, U <: Type](v1: Value[T])
    (implicit conv: (T#nativeType => U#nativeType)): (Value[U]) =
  new Value[U](conv(v1.get))

implicit def reflexiveWiden[T <: Type](v1: Value[T]): Value[T] = v1

trait LT[T <: Type, U <: Type] {}
implicit def intDoubleLT: LT[IntType, DoubleType] = null

trait WiderThan[T <: Type, U <: Type, V <: Type] {}
implicit def rightWider[T <: Type, U <: Type, V <: Type]
  (implicit rw: LT[T, U],
            conv2: (U#nativeType => V#nativeType),
            conv1: (T#nativeType => V#nativeType)): WiderThan[T,U,V] = null

implicit def leftWider[T <: Type, U <: Type, V <: Type]
  (implicit rw: LT[U, T],
            conv2: (T#nativeType => V#nativeType),
            conv1: (U#nativeType => V#nativeType)): WiderThan[T,U,V] = null

implicit def reflWider[T <: Type]: WiderThan[T, T, T] = null

case class AddExpression[T <: Type, U <: Type, V <: Type]
    (lhs: Expression[T], rhs: Expression[U])
    (implicit lub: WiderThan[T, U, V],
              widenLeft: Value[T] => Value[V],
              widenRight: Value[U] => Value[V],
              adder: Addable[V]) extends Expression[V] {
  def eval = {
    val lv = widenLeft(lhs.eval)
    val rv = widenRight(rhs.eval)
    adder.plus(lv.get, rv.get)
  }
}

Again, a similar approach would be applicable in an untyped Scala representation of interpreter-language types: we could represent types as terms, implement the least-upper-bound relation as a partial function mapping from a pair of terms to the least upper bound of the pair, and implement widenings as functions taking a value and a term representing the type to widen to.


  1. See the signature for the widen implicit argument to AddExpression: (Value[T], Value[U]) => (Value[V], Value[V])

  2. This type-inequality test will fail if it is given two identical types because both neqAmbig1 and neqAmbig2 will both be applicable.

  3. It occurred to me while developing these examples that using witness objects in this way is a lot like forward-chaining logic programming. (Note that we use negation-as-failure in implicit resolution when testing for type inequality and we use implicit arguments to guide further implicit resolution with the WiderThan witness.) Unsurprisingly, it turns out that other people have had the same idea! See the discussion on this post or this talk for two examples.

In my last post, I introduced two kinds of implicit type coercions that can appear in database query languages: type widenings, in which values are converted to wider types (e.g. from an int to a long or double), and type translations, in which a value of some type T might be converted to one of an unrelated type U if it is used where a value of U is expected. In this post, we’ll look at what sort of type coercions are available in Apache Hive and (in less detail) Microsoft SQL Server.

Implicit conversions in Apache Hive

Apache Hive features several kinds of types, many of which are also present in ANSI SQL with similar definitions:

  1. hardware-supported integral types, such as tinyint (one byte), smallint (two bytes), int (four bytes), and bigint (eight bytes);
  2. hardware-supported floating-point types, such as float (single-precision, four bytes) and double (double-precision, eight bytes);
  3. decimal values (38 digits precision in Hive 0.11 and 0.12; arbitrary-precision in Hive 0.13.0 and later);
  4. date and time types, such as timestamp and date;
  5. string types, including string (of arbitrary length), varchar[N] (of arbitrary length but less than N characters), and char[N] (of exactly N characters, possibly padded with spaces);
  6. boolean values;
  7. binary values (sequences of bytes); and
  8. compound values made up of Hive types: homogeneous arrays with some element type, maps containing keys of one type and values of another, and C-style struct and union types.

Hive supports some widenings and narrowings between these types.1 Among the hardware-supported numeric types, values can be widened but not narrowed.2 Strings can be narrowed to be used as varchar values; converting a string value to a varchar[N], where N is insufficient to hold the contents of the string, will cause the string to be truncated to N characters. It is also possible (as of Hive 0.13) to supply a decimal argument to many numeric functions that expect a double input, although in most cases the function will only process a double approximating the supplied arbitrary-precision value.

Hive also supports type translations to and from string values. Hive permits implicitly converting a value of any type (with the exception of boolean and binary) to a string. String representations of double or decimal values (but not the smaller integral or floating-point types) can also be converted to values of those types.

Hive supports widenings as part of object comparisons; the FunctionRegistry.getCommonClassForComparison method returns the least upper bound of two types. The code excerpt below shows how Hive also explicitly encodes which widenings and translations are permissible:

excerpted from Hive 0.12’s FunctionRegistry.javalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public static boolean implicitConvertable(PrimitiveCategory from, PrimitiveCategory to) {
  if (from == to) {
    return true;
  }

  PrimitiveGrouping fromPg = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(from);
  PrimitiveGrouping toPg = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(to);

  // Allow implicit String to Double conversion
  if (fromPg == PrimitiveGrouping.STRING_GROUP && to == PrimitiveCategory.DOUBLE) {
    return true;
  }
  // Allow implicit String to Decimal conversion
  if (fromPg == PrimitiveGrouping.STRING_GROUP && to == PrimitiveCategory.DECIMAL) {
    return true;
  }
  // Void can be converted to any type
  if (from == PrimitiveCategory.VOID) {
    return true;
  }

  // Allow implicit String to Date conversion
  if (fromPg == PrimitiveGrouping.DATE_GROUP && toPg == PrimitiveGrouping.STRING_GROUP) {
    return true;
  }
  // Allow implicit Numeric to String conversion
  if (fromPg == PrimitiveGrouping.NUMERIC_GROUP && toPg == PrimitiveGrouping.STRING_GROUP) {
    return true;
  }
  // Allow implicit String to varchar conversion, and vice versa
  if (fromPg == PrimitiveGrouping.STRING_GROUP && toPg == PrimitiveGrouping.STRING_GROUP) {
    return true;
  }

  // Allow implicit conversion from Byte -> Integer -> Long -> Float -> Double
  // Decimal -> String
  Integer f = numericTypes.get(from);
  Integer t = numericTypes.get(to);
  if (f == null || t == null) {
    return false;
  }
  if (f.intValue() > t.intValue()) {
    return false;
  }
  return true;
}

To see how Hive actually performs type coercions, we’ll have to take a step back and look at Hive’s architecture for defining functions.3 Hive has two interfaces for defining functions: UDF, which models a simple function with simply-typed arguments and a simply-typed return value, and GenericUDF, which models functions that can operate on and return values of compound types.

Subclasses of UDF include at least one method called evaluate (of arbitrary argument and return types); this is what gets called when the user-defined function is evaluated. Due to their flexible signatures, these evaluate methods are not specified in any interface and instead found via Java reflection. By contrast, a GenericUDF must support an initialize method that takes an array of ObjectInspector instances (essentially adapters from arbitrary types to concrete object values) and an evaluate method taking an array of DeferredObject instances (essentially futures representing objects).

The initialize method in GenericUDF is invoked with ObjectInspector instances corresponding to actual parameters; if the actuals aren’t implicitly convertible to the proper types, it will fail. Otherwise, it will return an ObjectInspector instance for the return type. As a simple example, see the initialize method in the class providing Hive’s implementation of the SQL CONCAT function:

excerpted from Hive 0.12’s GenericUDFConcatWS.javalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
  if (arguments.length < 2) {
    throw new UDFArgumentLengthException(
        "The function CONCAT_WS(separator,[string | array(string)]+) "
          + "needs at least two arguments.");
  }

  // check if argument is a string or an array of strings
  for (int i = 0; i < arguments.length; i++) {
    switch(arguments[i].getCategory()) {
      case LIST:
        if (isStringOrVoidType(
            ((ListObjectInspector) arguments[i]).getListElementObjectInspector())) {
          break;
        }
      case PRIMITIVE:
        if (isStringOrVoidType(arguments[i])) {
        break;
        }
      default:
        throw new UDFArgumentTypeException(i, "Argument " + (i + 1)
          + " of function CONCAT_WS must be \"" + serdeConstants.STRING_TYPE_NAME
          + " or " + serdeConstants.LIST_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME
          + ">\", but \"" + arguments[i].getTypeName() + "\" was found.");
    }
  }

  argumentOIs = arguments;
  return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}

Note that the above verifies both the correct number of arguments and the correct types of each argument before returning an ObjectInspector instance for writable strings. The evaluate method then invokes DeferredObject.get() on each argument, converts them to String values using built-in coercions, and concatenates them together, returning the result as a text value.

Plain UDF instances and GenericUDF instances alike are stored in Hive’s function registry, but the former are converted to GenericUDF instances first by wrapping them GenericUDFBridge, which is a proxy that uses Java introspection on the underlying UDF instance to determine what a function’s expected argument types are; it can then convert actual parameters to values of appropriate types using built-in coercions at execution time.

Implicit conversions in Microsoft SQL Server

While we can’t examine conversions supported in Microsoft SQL Server in as great detail as we can with Apache Hive (since the source for SQL Server isn’t available), the published documentation indicates which conversions are supported. In brief, SQL Server supports most of the same kinds of type coercions as Hive, with the following additions:

  1. bidirectional implicit translation from char[N] and varchar[N] to all numeric types (not merely double and decimal, as in Hive);
  2. financial types (money and smallmoney) are supported and can be implicitly translated to and from numeric types;
  3. bidirectional implicit translation between timestamp values to and from character and integral types;
  4. the sql_variant type, which can receive values of most types via implicit conversions but must be converted with an explicit CAST in contexts expecting a value of a different type; and
  5. various other types (xml, uniqueidentifier, and user-defined types from the CLR) with varying conversion semantics.

These additions are useful but their absence does not limit Hive’s expressive power. In the next post in this series, we’ll look at a general approach to implementing type widening, along with a specific (and statically-safe) realization of this approach using Scala’s type system.


  1. The Hive wiki includes a full conversion matrix.

  2. For example, it is permissible to use a tinyint where an int or double is expected, but not vice versa.

  3. Actually implementing new functions in Hive is outside the scope of this post, but there are lots of resources online if you’re interested. In particular, Matthew Rathbone has a great article about extending Hive with new functions.

In this post, we’re going to introduce two kinds of implicit type conversions that are common in database query languages:

  1. Type widening, in which a value of type T is converted to a value of some wider type T’, where T ⊆ T’. As an example, given the expression b / d, where b holds an unsigned 8-bit value and d holds a double-precision floating-point value, a type-widening conversion would convert the value of b to a double-precision value and use this value as the left-hand operand of a floating-point division.
  2. Type translation, in which a value is transformed to produce a value of some unrelated type. As an example, consider a programming language that evaluates the expression sd / d as floating-point division where d is a double-precision floating-point value and sd is a string value that represents a number.

Type widening is common in typed general-purpose programming languages. For example, in C or Java, a / b is integer division with an integer result if both a and b are integer values but floating-point division — evaluating to the wider of the types of its operands — if either a or b is a floating-point value. Similarly, it is generally possible in such languages to assign a value of a narrower type to a variable of a wider one.

However, type translation (by necessity) only appears in untyped1 languages. This is the case because types are static properties but we cannot in general statically determine whether or not type translations will succeed. By allowing type translation, we are thus necessarily deferring decisions about whether or not a program fragment makes sense in some type system until it executes, and trading some flexibility for the possibility of runtime errors. Some programming communities regard this as an acceptable tradeoff.

Several extant database systems, including Hive, PostgreSQL, SQLite (see also here), and Microsoft SQL Server) support both type widening and type translations, so that one can, for example, take the cosine of the string representing a double-precision floating point value.

In subsequent posts, we’ll look at what type coercions some of these systems support and present general approaches to implementing support for type widening and type coercion in language interpreters, with specific techniques for realizing these general approaches in interpreters implemented in Scala.


  1. It may also appear in languages in which all programs are trivially well-typed and in which the type system thus cannot provide useful static guarantees about runtime behavior. An example such language is Tcl, where everything is a string. (By most definitions of typedness, though, “trivially typed” languages are untyped.)

Yesterday’s post provided a couple of minimal Docker images for spinning up containers to do Scala and Java builds and tests. The second of the images had a pre-loaded Ivy cache for Apache Spark’s dependencies installed in /root. The setup I suggested for launching these images included mounting a directory from your host inside the container using Docker’s -v option, but this is less than ideal: any files that are written or modified during a build or test run will be owned by root instead of by whoever owned the directory on the host!

As I understand it, Docker will be able to map container users to host users in the future, but for now we can work around this problem with a quick hack. The docker run command lets us pass environment variables that will be set inside the container, and we can use this to make sure that any files written in the container get owned by the right user on the host. Here’s what I did:

  1. I created another image based on my Spark development image, which I called willb/java-dev:centos7-spark-uid
  2. This new image adds a normal user called dockerdev and copies the Ivy, Maven, and sbt caches from /root to that user’s home directory.
  3. The new image runs a small script on container launch that fixes up the user ID and group ID of the dockerdev user and its home directory based on the DEV_UID and DEV_GID environment variables supplied to docker run before substituting to the dockerdev user and executing a shell.

So to launch a container for Spark development in which your ~/devel directory is mapped to /devel and in which all changes to that directory wind up owned by you on the host, you’d do something like this:

docker run --privileged=true -e DEV_UID=$UID -e DEV_GID=$GID -i -t -v ${HOME}/devel:/devel willb/java-dev:centos7-spark-uid

Again, this is a hack — use it at your own risk! —, but it’s useful until Docker supports user mapping more cleanly. I’m interested to hear about your approaches to solving this problem as well!




Appendix: boot.sh, the user-mapping boot script

boot.shlink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/bin/sh

export ORIGPASSWD=$(cat /etc/passwd | grep dockerdev)
export ORIG_UID=$(echo $ORIGPASSWD | cut -f3 -d:)
export ORIG_GID=$(echo $ORIGPASSWD | cut -f4 -d:)

export DEV_UID=${DEV_UID:=$ORIG_UID}
export DEV_GID=${DEV_GID:=$ORIG_GID}

ORIG_HOME=$(echo $ORIGPASSWD | cut -f6 -d:)

sed -i -e "s/:$ORIG_UID:$ORIG_GID:/:$DEV_UID:$DEV_GID:/" /etc/passwd
sed -i -e "s/dockerdev:x:$ORIG_GID:/dockerdev:x:$DEV_GID:/" /etc/group

chown -R ${DEV_UID}:${DEV_GID} ${ORIG_HOME}

exec su - dockerdev

I recently experienced some bizarre failures running Akka actors and sbt tests in forked mode on my Fedora laptop. As far as I can tell, the root of both problems was that my network configuration changed when switching wireless networks, which caused java.net.InetAddress.getLocalHost() to return my WAN IP address (which processes on my machine couldn’t bind to or connect to) instead of my LAN IP address.1 This was especially frustrating since the relevant changes to my network configuration happened not when I switched networks, but after waking my laptop from sleep on the new network — so tests succeeded before sleep but failed after it!

My patience for troubleshooting networking configurations has diminished substantially since the first time I set up pppd on a Linux 1.1 system and I was unwilling to disconnect from the network just to run tests. Fortunately, running my tests in a Docker container proved to be a fairly straightforward workaround — Java was able to correctly identify the right address to bind to when running on a bridged network and the tests ran at native speed. Maintaining a Docker container for Java development has other benefits, as well: while I usually want to work within the Fedora Java ecosystem (to ensure that I can package my work for Fedora), if I’m working on patches to go to upstream projects, I want an environment that more closely resembles the one that upstream CI and conventional deployments will run under.

I’ve created and published a minimal Docker image for a conventional JVM testing environment. This image is based on the Centos 7 image; while I love Fedora for day-to-day and development use, Centos is a much slower-moving target and is thus far more common as a deployment target. My image also installs java, java-devel, maven, ivy, git, and emacs via yum and pulls down Paul Phillips’ excellent sbt-extras script to provide ready access to multiple versions of sbt.

Here’s how to get started using it on Fedora: first, you’ll need to install Docker and start up the Docker daemon (if you haven’t already). The gpasswd line is optional but will enable you to run Docker commands as yourself without su or sudo:

sudo yum install -y docker-io
sudo gpasswd -a ${USER} docker
sudo systemctl start docker

Then you’ll want to pull my image from the Docker Hub (run this and all Docker commands under sudo if you didn’t add yourself to the Docker group above):

docker pull willb/java-dev

You should then be able to see that the image file is installed:

docker images willb/java-dev

Now you’re ready to start a container from the image. I keep most of my ongoing development work in subdirectories of ${HOME}/devel and I’d like to mount that tree from within my container; replace ${HOME}/devel below with whatever path from the host you’d like to expose to the container:

docker run -i -t -v ${HOME}/devel:/devel willb/java-dev:centos7

This will start up the container interactively (-i), allocating a TTY (-t), mounting ${HOME}/devel from the host under /devel in the container, and giving you a zsh prompt. You can now run sbt and Maven builds and tests in a vanilla, upstream-friendly CentOS environment while still using Fedora as your desktop OS.

Regular readers of this site know that I’ve been interested in using and contributing to Apache Spark for quite some time now, and I was running Spark’s test suite when I ran into the network trouble that motivated this post. If you’re also working on Spark, you might want to try an alternate image that has a prepopulated Ivy cache for Spark’s dependencies:2

docker run -i -t -v ${HOME}/devel:/devel willb/java-dev:centos7-spark

Running your initial Spark build and tests in a container with the centos7-spark image will be faster since the necessary artifacts are baked into the image. (If you don’t have the Ivy cache, you’ll need to download dependencies every time you run your first build in a container based on this image.) The easiest way to build such a cache for sbt projects is to have the Dockerfile clone the relevant source repository into a temporary directory and then run sbt update from that clone. Let me know if you build other images with prepopulated dependency caches!


  1. Java networking on Linux seems to have a lot of corner cases; StackOverflow and other question/answer sites are full of people who are frustrated that InetAddress.getLocalHost() returns the loopback address instead of a LAN IP, but I had the opposite problem.

  2. This image should include all of the necessary build and test dependencies for the Spark master branch as of when I last generated it; I will update it periodically but it may not necessarily contain everything necessary to build and test the current Spark master branch in the future.