I went to Strata + Hadoop World last week. This event targets a pretty broad audience and is an interesting mix of trade show, data science conference, and software conference. However, I’ve been impressed by the quality and relevance of the technical program both of the years that I’ve gone. The key to finding good talks at this kind of event is to target talks focusing on applications, visualization, and fundamental techniques, rather than ostensibly technical talks.1 In the rest of this post, I’ll share some of my notes from the talks and tutorials I enjoyed the most.

D3.js

The first event I attended was Sebastian Gutierrez’s D3 tutorial. (Gutierrez’s slides and sample code are available online.) A great deal of this session was focused on introducing JavaScript and manipulating the DOM with D3, which was great for me; I’ve used D3 before but my knowledge of web frontend development practice is spotty and ad-hoc at best.

This was a great tutorial: the pace, scope, content, and presentation were all really excellent. It provided enough context to make sense of D3’s substantial capabilities and left me feeling like I could dive in to using D3 for some interesting projects. Finally, Gutierrez provided lots of links to other resources to learn more and experiment, both things I’d seen, like bl.ocks.org, and things that were new to me, like tributary.io and JSFiddle.

The standalone D3 tutorial on DashingD3JS.com seems like an excellent way to play along at home.

Engineering Pipelines for Learning at Scale

I attended the “Hardcore Data Science” track in the afternoon of the tutorial day; there were several interesting talks but I wanted to call out in particular Ben Recht’s talk, which mentioned BDAS in the title but was really about generic problems in developing large-scale systems that use machine learning.

Recht’s talk began with the well-known but oft-ignored maxim that machine learning is easy once you’ve turned your analysis problem into an optimization problem. I say “oft-ignored” here not because I believe practitioners are getting hung up on the easy part of the problem, but because it seems like raw modeling performance is often the focus of marketing for open-source and commercial projects alike.2 The interesting engineering challenges are typically in the processing pipeline from raw data to an optimizable model: acquiring data, normalizing and pruning values, and selecting features all must take place before modeling and classification. Learning problems often have this basic structure; Recht provided examples of these pipeline stages in object recognition and text processing domains.

The main motivating example was Recht’s attempt to replicate the Oregon State Digital Scout project, which analyzed video footage of American football, and answered questions including what plays were likely from given offensive formations. So Recht’s team set out to stich together video footage into a panorama, translate from video coordinates to inferred field coordinates, and track player positions and movements. This preprocessing code, which they had implemented with standard C++ and the OpenCV library, took ten hours to analyze video of a four-second NFL play. Expert programmers could surely have improved the runtime of this code substantially, but it seems like we should be able to support software engineering of machine-learning applications without having teams of experts working on each stage of the learning pipeline.

The talk concluded by presenting some ideas for work that would support software engineering for learning pipelines. These were more speculative but generally focused on using programming language technology to make it easier to develop, compose, and reason about learning pipelines.

Keynotes

Since the talks from the plenary sessions are all fairly brief (and freely available as videos), I’ll simply call out a couple that were especially enjoyable.

Domain-Specific Languages for Data Transformation

Joe Hellerstein and Sean Kandel of Trifacta gave a talk on domain-specific languages for data transformation in general and on their Wrangle system in particular. (Trifacta is also responsible for the extremely cool Vega project.) The talk led with three rules for successful data wrangling, which I’ll paraphrase here:

  • your processes, not your data, are most important;
  • reusable scripts to process data are more valuable than postprocessed and cleaned data; and
  • “agile” models in which preprocessing is refined iteratively in conjunction with downstream analysis of the processed data are preferable to having preprocessing take place as an isolated phase.3

The first act of the talk also referenced this quotation from Alfred North Whitehead, which set the tone for a survey of DSLs for data transformation and query construction:

By relieving the brain of all unnecessary work, a good notation sets it free to concentrate on more advanced problems…

In the 1990s, several data processing DSLs emerged that were based on second-order logic, proximity joins, and composable transformations.4 Hellerstein argued that, since these underlying formalisms weren’t any simpler to understand than the relational model, the DSLs weren’t particularly successful at simplifying data transformation for end-users.

Raman and Hellerstein’s Potter’s Wheel system presented a visual interface to SQL for data cleaning; it could automatically detect bad data and infer structure domains but wasn’t particularly easier to use than SQL: putting a visual, menu-driven interface over a language doesn’t generally lower the cognitive load of using that language.5

The Wrangler system (which forms the basis for the Wrangle DSL in Trifacta’s product) attacks the problem from a different angle: data cleaning operations are suggested based on explicit user guidance, semantic information about types and domains, and inferences of user intent based on prior actions. The entire problem of suggesting queries and transformations is thus expressible as an optimization problem over the space of DSL clauses. With the interface to the Wrangle DSL, the user highlights features in individual rows of the data to appear in the transformed data or in a visualization, and the system presents several suggested queries that it had inferred. The Wrangle system also infers regular expressions for highlighted text transformations, but since regular expressions — and especially machine-generated ones — are often painful to read, it translates these to a more readable (but less expressive) representation before presenting them to the user.

Kandel gave a very impressive live demo of Wrangle on a couple of data sets, including techniques for sensibly treating dirty or apparently-nonsensical rows (in this example, the confounding data included negative contribution amounts from election finance data, which represented refunds or revised accounting). This is excellent work at the intersection of databases, programming languages, machine learning, and human-computer interaction — which is a space that I suspect a lot of future contributions to data processing will occupy.

Clustering for Anomaly Detection

Sean Owen’s talk on using Spark and k-means clustering for anomaly detection was absolutely packed. Whether this was more due to the overwhelming popularity of talks with “Spark” in the title or because Sean is widely known as a sharp guy and great presenter, I’m not sure, but it was an excellent talk and was about more than just introducing clustering in Spark. It was really a general discussion of how to go from raw data to a clustering problem in a way that would give you the most useful results — Spark was essential to make the presentation of ETL and clustering code simple enough to fit on a slide, but the techniques were generally applicable.

The running example in this talk involved system log data from a supervised learning competition. Some of the log data was generated by normal activity and some of it was generated by activity associated with various security exploits. The original data were labeled (whether as “normal” or by the name of the associated exploit), since they were intended to evaluate supervised learning techniques. However, Owen pointed out that we could identify anomalies by clustering points and looking at ones that were far away from any cluster center. Since k-means clustering is unsupervised, one of Owen’s first steps was to remove the labels.

With code to generate an RDD of unlabeled records, Owen then walked through the steps he might use if he were using clustering to characterize these data:

  • The first question involves choosing a suitable k. Since Owen knew that there were 23 different kinds of records (those generated by normal traffic as well as by 22 attack types), it seems likely that there would be at least 23 clusters. A naïve approach to choosing a cluster count (viz., choosing a number that minimizes the distance from each point to its nearest centroid) falls short in a couple of ways. First, since cluster centers are originally picked randomly, the results of finding k clusters may not be deterministic. (This is easy to solve by ensuring that the algorithm runs for a large number of iterations and has a small distance threshold for convergence.) More importantly, though, finding n clusters, where n is the size of the population, would be optimal under this metric, but it would tell us nothing.
  • The vectors in the raw data describe 42 features but the distance between them is dominated by two values: bytes read and bytes written, which are relatively large integer values (as opposed to many of the features, which are booleans encoded as 0 or 1). Normalizing each feature by its z-score made each feature contribute equally to distance.
  • Another useful trick is encoding features whose space is a finite domain of n elements as n boolean features (where only one of them will be true). So if your feature could be either apple, banana, cantaloupe, or durian in the raw data, you could encode it as four boolean-valued features, one of which is true if and only if the feature is apple, one of which is true if and only if the feature is banana, and so on.
  • Using entropy (or normalized mutual information) to evaluate the clustering can be a better bet since fitness by entropy won’t increase until k = n as fitness by Euclidean distance will. Another option is restoring the labels from the original data and verifying that clusters typically have homogeneous labels.

Summary

Many of the talks I found interesting featured at least some of the following common themes: using programming-language technology, improving programmer productivity, or focusing on the pipeline from raw data to clean feature vectors. I especially appreciated Sean Owen’s talk for showing how to refine clustering over real data and demonstrating substantial improvements by a series of simple changes. As Recht reminds us, the optimization problems are easy to solve; it’s great to see more explicit focus on making the process that turns analysis problems into optimization problems as easy as possible.


  1. Alas, talks with technical-sounding abstracts often wind up having a nontrivial marketing component!

  2. This is related to the reason why “data lakes” have turned out to be not quite as useful as we might have hoped: a place to store vast amounts of sparse, noisy, and irregular data and an efficient way to train models from these data are necessary but not sufficient conditions for actually answering questions!

  3. Hellerstein pointed out that the alternative to his recommendation is really the waterfall model (seriously, click that link), which makes it all the more incredible that anyone follows it for data processing. Who would willingly admit to using a practice initially defined as a strawman?

  4. SchemaSQL and AJAX were provided as examples.

  5. I was first confronted with the false promise of visual languages in the 1980s. I had saved my meager kid earnings for weeks and gone to HT Electronics in Sunnyvale to purchase the AiRT system for the Amiga. AiRT was a visual programming language that turned out to be neither more expressive nor easier to use than any of the text-based languages I was already familiar with. With the exception of this transcript of a scathing review in Amazing Computing magazine (“…most of [its] potential is not realized in the current implementation of AiRT, and I cannot recommend it for any serious work.”), it is now apparently lost to history.

leitmotif is a very simple templating tool that generates directories from prototypes stored in git repositories. Its design prioritizes simplicity and a minimal set of external dependencies.

While the leitmotif tool is still under development and some interesting features are planned for future work, it is already useful for many project-templating tasks. This post will show you how to get started.

Installing Leitmotif

You can install leitmotif either by using the RubyGem or by using the standalone version, which only depends on Ruby 2 and the Thor gem.

To install the RubyGem, simply run gem install leitmotif.

To install the standalone script, run the following commands:

1
2
3
gem install thor --version=0.17 && \
    curl -O https://github.com/willb/leitmotif/raw/master/extra/standalone/leitmotif && \
    chmod 755 leitmotif

In the future, I expect that leitmotif packages will be available for CentOS and Fedora.

Using Leitmotif

The leitmotif tool is self-documenting. Run leitmotif help to see options.

Copying a prototype from a remote repository

Use leitmotif clone URL to make a local copy of the remote prototype repository at URL in your local Leitmotif prototype store (under your home directory).

Listing locally-installed prototypes

Use leitmotif list to see the prototypes you have installed in your local store.

Instantiating prototypes

Use leitmotif generate PROTOTYPE OUTPUT_DIR to instantiate PROTOTYPE in OUTPUT_DIR. In this form, PROTOTYPE must be the path to a git repository containing a Leitmotif prototype. This command supports the following options:

  • --local: treat PROTOTYPE as the name of a prototype repository in the local store rather than as a path
  • --clobber: delete OUTPUT_DIR before processing the prototype, if it exists
  • --ref: a git tag, branch, or SHA to use from the prototype repository (defaults to master if unspecified)
  • --bindings KEY:VALUE ...: a list of variable bindings to use while instantiating the prototype
  • --verbose: provide additional output as leitmotif works

Creating Leitmotif prototypes

A Leitmotif prototype is just a git repository with a particular structure. Specifically, a prototype must have two entries in the repository root directory:

  1. a YAML file named .leitmotif that contains metadata about the prototype, and
  2. a directory named proto, which contains the prototype itself.

Prototype metadata

A .leitmotif file is just a YAML representation of a hash of metadata options. The following keys can appear in a .leitmotif file:

  • :name: the name of the prototype (used for documentation)
  • :version: the version of the prototype (used for documentation)
  • :required: a list of variables that must have user-provided values when the prototype is instantiated
  • :ignore: a list of files to copy to the instantiated prototype without processing by the templating engine
  • :defaults: a hash consisting of default values for prototype variables

Here’s an example .leitmotif file, from a prototype for Spark application development:

1
2
3
4
5
6
7
8
9
10
11
---
:name: sparkdev
:version: '0'
:required:
  - name
:ignore:
  - sbt
:defaults:
  :version: '0.0.1'
  :scala_version: '2.10.4'
  :spark_version: '1.1.0'

Prototypes and templating

With the exception of the files in the :ignore list, all of the files in a prototype repository are processed by ERB after they’re copied to the output directory. For more details on eRuby, see elsewhere, but here are a few basics to get you started:

  • If a template file contains Ruby code surrounded by <% and %>, that code is evaluated at prototype instantiation time with user-supplied variable bindings.
  • If a template file contains Ruby code surrounded by <%= and %>, that code is evaluated at prototype instantiation time with user-supplied variable bindings and the result of evaluating that code is substituted into the document at that point.

Combining these two, we can see how to use loops in a file:

1
2
3
4
5
6
7
8
Specifications of properties are often terser than explicit expected results.  For example:

<% 99.downto(1).each do |bottles| %>
   * <%= bottles %> bottles of beer on the wall, 
     <%= bottles %> bottles of beer; 
     take one down, pass it around, 
     <%= bottles > 1 ? "#{bottles} bottles" : "just one bottle" %> of beer on the wall
<% end %>

The above will generate a Markdown file containing a bulleted list that will strike terror into the heart of any adult who has ever been on a bus full of middle-schoolers.

Coming soon

I wrote this tool to solve an immediate need1 and will be updating it as new requirements become apparent. However, there are a few things that are already on my roadmap:

  • automated test coverage (currently — and shamefully! — there is none)2
  • additional commands, e.g., to inspect installed prototypes
  • post-instantiation actions, e.g., to rename a file or create a directory based on the result of a variable expansion

Of course, I welcome your feedback, issue reports, and patches as well.


  1. Specifically, my need to be lazy when creating new projects.

  2. I’m probably getting too old to program in untyped languages.

I’ve written in the past about what a mistake it is to add behavioral incentives or morality clauses to the licenses for open-source projects. Briefly, these clauses are bad:

  • philosophically because they infringe the on basic software freedom to use a freely-available project for any purpose1;
  • legally because they generally rely on vague and subjective terms and (paradoxically) thus rendering your license both radioactive and unenforceable; and
  • practically because instead of preventing people from doing things that you don’t like, they just prevent people — even those who wouldn’t be inclined to do things you don’t like! — from using your work.

One software domain in which licenses with usage restrictions are commonplace (and, unfortunately, widely accepted) is digital fonts. This has historically been a problem for Fedora, since many “free” fonts are not open-source (in that they do not permit modification) or have usage restrictions (in particular, against “commercial” use). Furthermore, much like novel one-off open-source software licenses, most partially-free licenses authored by font creators are unlikely to be unambiguous or legally sensical.2

The problem gets far worse when we look at commercial fonts, for which the type of application and type of output are often incorporated in usage restrictions. Last year, I licensed a font whose EULA was self-contradictory; you can click the link for the whole story, but the main problem was that it claimed to broadly allow rasterization, including use to “create images on computer screens, paper, web pages, photographs, movie credits, printed material, T-shirts, and other applications,” but that I couldn’t use the font “as part of broadcasting video or film […] the use of the font software in titling, credits or other text for any onscreen broadcast via television, video, or motion picture.”

Since one of my creative endeavors is editing home and bicycling videos, the combination of being allowed to “create … movie credits” but not to “use the font software in titling, credits or other text for … motion picture” was pretty frustrating, especially since the EULA hadn’t been obviously available for review until after I licensed the face. (There were numerous other problems, ambiguities, and contradictions with the license, and when I asked the foundry for for a clarification, they curtly denied that there was anything to be confused about.)

For more than a year and a half, this inconsistent license has been my benchmark to evaluate whether or not a font is more trouble than it’s worth, and I’ve at least learned to be more careful reading font EULAs.3 However, I recently came across an example4 that so completely outclasses the contradictory license that I can’t imagine it will be replaced as the new standard for terrible usage clauses in licenses. Here’s an excerpt; the author’s name is redacted:

Fonts by [redacted] may NOT be used for pornographic, derogatory, defamatory or racist material (in any form, printed or virtual); fonts by [redacted] may NOT be used by individuals or companies involved in child abuse, child pornography or child labour; fonts by [redacted] may NOT be used by individuals or companies involved in destruction of natural resources and/or habitat, logging (even legal), palm oil exploitation/harvesting, tuna fishing, whaling, animal trafficking, oil and/or gas drilling or transporting and mining. Fonts by [redacted] may NOT be used by individuals or companies promoting an unhealthy lifestyle (fast food, energy drinks, foods containing GM ingredients). Fonts by [redacted] may NOT be used by companies or individuals involved in Genetic Modification / Genetic Alteration of organisms. Fonts by [redacted] may NOT be used by individuals or companies involved in fur trade, or making use of fur. Fonts by [redacted] may NOT be used by missionaries, individuals or institutions of any creed or faith for the purpose of converting others to their creed or faith. Fonts by [redacted] may NOT be used to instigate terror, hate, intolerance, fear or racism.

I almost don’t know where to begin with this one, since it raises so many questions for the licensor:

  • When do we cross the line from light-hearted ribbing to “derogatory material”? Are satire and parody proscribed?
  • Say I use this font to title a bicycling video; would the licensor need to see what sort of chain lubricant I used or whether or not I was a regular inner-tube recycler before determining that I wasn’t involved in “destruction of natural resources”? (What about people who eat animal protein? Or plants?)
  • Are we mostly worried about the Indonesian orangutan, or is (relatively-low-impact) West African palm oil OK? Is the palm oil industry full of typophiles? How many jars of palm oil bore the licensor’s glyphs before he added this clause?
  • How are we defining “unhealthy” or “genetic modification”? It seems like consensus regularly changes on the former and the latter is — like much of the language in this clause — actually kind of vague despite being deployed in an absolute manner. Can I replant seeds from my best tomatoes or must I toss them? (In a related question, since I have the suspicion that it might come up in a future revision of this license: is it OK to use this font to promote vaccination?)
  • What about people of faith whose creed includes a concept of vocation, or the notion that they serve God through their creative and professional work?

Hopefully, these kinds of questions — along with many more like them that you may be asking yourself — underscore why this kind of license is a problem: the licensor probably hoped to make a clear statement of principles, condemn what he saw as social ills, and avoid assisting people and groups he’d find objectionable. Instead, the license restrictions are so broad as to exclude use by anyone except the font’s creator (who is unlikely to sue himself for breach of contract). No matter who you are or what you do, if the licensor wanted to sue you, he probably could.

I’m inclined to excuse that last sentence, though, since the licensor seems to know a thing or two about instigating “hate, intolerance, [and] fear.”


  1. The GNU Project calls this “Freedom 0.”

  2. If you’re releasing open-source software, you should use a well-known license. If you want to release a font freely, you should have a very good reason for using anything except the Open Font License or the LPPL.

  3. This may be surprising to people used to regular software licensing, but I’ve seen fonts for sale in different places with different licenses! It seems like some sellers have standard EULAs and require font creators to allow distribution under these, while others maintain the creators’ EULAs.

  4. Alas, I came across this example after paying for a license.

As everyone knows, internet comments are pure, unadulterated evil.1 But it’s nonetheless nice when blogs give readers an easy way to provide feedback. I recently reworked my Octopress installation to allow readers to reply to my posts via Twitter or email; this post will show you how to set it up for yourself. We’ll be using Twitter’s Web intents API to make it happen.

Step 1: Configuration

If you haven’t already set your Twitter username in your Octopress _config.yml, do so now; here’s what mine looks like:

excerpted from _config.yml
1
2
3
# Twitter
twitter_user: willb
twitter_tweet_button: false

Step 2: Create a new partial layout

The next thing we’ll do is add a new partial layout that generates reply-via-Twitter links. Create source/_includes/post/feedback.html in your Octopress directory and give it the following contents:

source/_includes/post/feedback.html
1
2
3
4
5
6
7
 • You may
 {% if site.twitter_user and page.skip_twitter_feedback != true %}
    <script type="text/javascript" src="//platform.twitter.com/widgets.js"></script>
    {% capture tweet_text %}@{{site.twitter_user}} re: {{site.url}}{{page.url}}{% endcapture %}
  <a href="https://twitter.com/intent/tweet?text={{ tweet_text | uri_escape }}">reply to this post on Twitter</a> or
 {% endif %}
<a href="mailto:{{site.email}}">email the author</a>.

Note that this will always provide an email feedback link, targeting the site-wide email variable from your _config.yml.2 It will provide reply-via-twitter links if you’ve set the twitter_user configuration variable and haven’t set skip_twitter_feedback: true in your post’s YAML front matter. (I use this to eliminate redundancy for cases like this, in which I’ve asked for replies via Twitter in the body of the post.) When a reader clicks on the reply-via-twitter link, it will take them to Twitter3 with a prepopulated tweet:

Web Intent

Step 3: Add the new partial to your post template

Finally, edit your post template to ensure that it includes feedback.html in an appropriate place. This will obviously depend on your theme and your taste, but I’ve included my post template — which is based on octostrap3 and includes the feedback links directly following the category list — as an example; see line 19:

source/_layouts/post.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
---
layout: default
navbar: Blog
single: true
---

<div class="row">
  <div class="page-content {% unless page.sidebar == false or site.post_asides == empty and site.default_asides == empty %}col-md-9{% else %}col-md-12{% endunless %}" itemscope itemtype="http://schema.org/Blog">
    <meta itemprop="name" content="{{site.title}}" />
    <meta itemprop="description" content="{{site.description}}" />
    <meta itemprop="url" content="{{site.url}}" />
    <article class="hentry" role="article" itemprop="blogPost" itemscope itemtype="http://schema.org/BlogPosting">
      {% include article.html %}
      <footer>
        <p class="meta text-muted">
          {% include post/author.html %}
          {% include post/date.html %}{% if updated %}{{ updated }}{% else %}{{ time }}{% endif %}
          {% include post/categories.html %}
          {% include post/feedback.html %}
        </p>
        {% unless page.sharing == false %}
          {% include post/sharing.html %}
        {% endunless %}
        {% if page.previous.url or page.next.url %}
          <ul class="meta text-muted pager">
            {% if page.previous.url %}
            <li class="previous"><a href="{{page.previous.url}}" title="Previous Post: {{page.previous.title}}">&laquo; {{page.previous.title}}</a></li>
            {% endif %}
            {% if page.next.url %}
            <li class="next"><a href="{{page.next.url}}" title="Next Post: {{page.next.title}}">{{page.next.title}} &raquo;</a></li>
            {% endif %}
          </ul>
        {% endif %}
      </footer>
    </article>

    {% if site.disqus_short_name and page.comments == true %}
      <section>
        <h1>Comments</h1>
        <div id="disqus_thread" aria-live="polite">{% include post/disqus_thread.html %}</div>
      </section>
    {% endif %}
  </div>

  {% unless page.sidebar == false or site.post_asides == empty and site.default_asides == empty %}
  <aside class="sidebar col-md-3">
    {% if site.post_asides.size %}
      {% include_array post_asides %}
    {% else %}
      {% include_array default_asides %}
    {% endif %}
  </aside>
  {% endunless %}
</div>

Step 4: There is no Step 4

I hope this was helpful! If you have feedback on this post, I bet you can figure out where to find me.


  1. Only mostly kidding.

  2. If you haven’t sent this variable, you can replace {{site.email}} with your email address. You can also replace the whole mailto link with the results of some obfuscation technique like the Hivelogic Enkoder if you still care about concealing your email address from spambots.

  3. By default, it will open this in a pop-up window unless your users are blocking Javascript from Twitter. If you’re interested in avoiding pulling down scripts from external sites, eliminating the script link to platform.twitter.com/widgets.js will only disable the pop-up window, not affect reply-via-tweet functionality in general. (You can, of course, use well-known techniques to open the link in a pop-up window without the external script.)

One of my side projects this year has been using Apache Spark to make sense of my bike power meter data. There are a few well-understood approaches to bike power data modeling and analysis, but the domain has been underserved by traditional machine learning approaches, and I wanted to see if I could quickly develop some novel techniques. My experiments so far are available as open-source code and they have been quite successful in two ways: not only have I mined some useful information from my rides, but I’ve also demonstrated that Spark is responsive enough to be a good substitute for R in exploratory use cases.

However, the code I’ve produced up to this point has really been more focused on experimenting with analytic techniques rather than on scalable implementations. I’ve valued obvious correctness and simplicity over performance, in some cases making implementation decisions that were obviously suboptimal in the interest of having simple implementations. (The blessing and the curse of declarative programming is that you’re insulated, to some extent, from how your application actually executes!)

Since publishing the code, giving talks about it, and demoing it, I’ve decided that the basic approach is fairly sensible. I’ve also seen a lot of interest from teammates and other cyclists who’d like to analyze their own data, and I’d like to eventually make the code available as a public-facing service — which means that it’s time to adjust my tradeoffs to favor efficiency a bit more.

In this post, we’ll look at several minor changes (and one major change) that improved my code’s performance, identifying the common issues that each uncovered. This certainly won’t be an exhaustive catalog of Spark optimization tips, and I’m not claiming that the code I wound up with is perfect, but keeping these issues in mind should improve your Spark apps. We’ll cover using broadcast variables to distribute data to workers, using caching sensibly, being sensitive to data shuffles, and designing applications to take advantage of laziness.

Background

The particular code we’re looking at is from one of the the applications I discussed at Spark Summit this year.1 The question this app aims to answer is “given historical ride data, where am I likely to get my best mean n-second power?” Its basic approach is:

  1. load activity data from disk, transforming it into a collection of Trackpoint records,
  2. optimize a k-means clustering (with a relatively large k) for the spatial locations of each Trackpoint,
  3. find the mean power for each overlapping n-second window of each activity,
  4. for each pair (i, j) of spatial clusters, reject all but the best n-second efforts starting in cluster i and ending in cluster j, and
  5. finally, plot the trackpoints from the best remaining 20 efforts

The Trackpoint structure is designed to capture all the metrics2 in each sample recorded in a Garmin TCX file; it is a Scala case class with timestamp, coordinates, altitude, wattage, and activity identity. Once we’ve processed our input files into Trackpoint records, we’ll group these records by activity, giving us a collection of pairs that encode a mapping from activity names to sequences of trackpoints, like this:

Schematic of per-activity trackpoint collections

Since dealing with windowed data is common to several of my bike data applications, I had factored it out into a common trait:

excerpted from common.scalalink
1
2
3
4
5
6
7
8
9
10
11
trait ActivitySliding {
  import org.apache.spark.rdd.RDD
  import com.freevariable.surlaplaque.data.Trackpoint

  def windowsForActivities[U](data: RDD[Trackpoint], period: Int, xform: (Trackpoint => U) = identity _) = {
    val pairs = data.groupBy((tp:Trackpoint) => tp.activity.getOrElse("UNKNOWN"))
    pairs.flatMap({case (activity:String, stp:Seq[Trackpoint]) => (stp sliding period).zipWithIndex.map {case (s,i) => ((activity, i), s.map(xform))}})
  }

  def identity(tp: Trackpoint) = tp
}

The windowsForActivities function maps the data into overlapping period-sample windows. Since it’s not possible, in general, to efficiently calculate sliding windows over RDDs3, we operate on materialized sequences of trackpoints, and return pairs representing a mapping from pairs of activity identifiers and sample offsets to sequences representing sample windows, like this:

Diagram of sample windows

The windowsForActivities function also takes an optional transformation function argument (xform), which can be used to map Trackpoint records after splitting them into sliding windows. In my case, I use these to discard metrics data that isn’t relevant to a given application. Since our first approach to finding best efforts involves keeping every effort window around until we know which ones we’ll ultimately care about,4 going to a leaner trackpoint structure can save a lot of memory and improve performance. In the case of this application and my data, translating trackpoint records to simpler records (containing only latitude, longitude, and wattage) resulted in approximately a 2x speedup on my data.

In the rest of this post, we’ll look at the initial approach and some iterative refinements. For each refinement, we’ll look at the general Spark application performance issues it uncovered as well as the particular speedups I saw. (We’ll also look at a change that I thought would improve performance but didn’t.) First, though, a word of caution: I tried to do meaningful and repeatable timings, but was working on my personal workstation (hardly a controlled environment) didn’t do anything resembling a rigorous performance evaluation. Furthermore, your mileage may vary; be sure to thoroughly test improvements on your own applications and data!

The first pass

We’ll be looking at a small function in which PowerBestsApp spends most of its time; this is the function that finds the best n-sample efforts after filtering out all but the best effort starting and ending in a given pair of clusters. Initially, it looks like this:

excerpted from power_bests.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: KMeansModel) = {
  val windowedSamples = windowsForActivities(data, period, stripTrackpoints _).cache
  val clusterPairs = windowedSamples
    .map {case ((activity, offset), samples) => ((activity, offset), (closestCenter(samples.head.latlong, model), closestCenter(samples.last.latlong, model)))}
  val mmps = windowedSamples.map {case ((activity, offset), samples) => ((activity, offset), samples.map(_.watts).reduce(_ + _) / samples.size)}

  val top20 = mmps.join(clusterPairs)
   .map {case ((activity, offset), (watts, (headCluster, tailCluster))) => ((headCluster, tailCluster), (watts, (activity, offset)))}
   .reduceByKey ((a, b) => if (a._1 > b._1) a else b)
   .map {case ((headCluster, tailCluster), (watts, (activity, offset))) => (watts, (activity, offset))}
   .sortByKey(false)
   .take(20)

  app.context.parallelize(top20, app.context.defaultParallelism * 4)
   .map {case (watts, (activity, offset)) => ((activity, offset), watts)}
   .join (windowedSamples)
   .map {case ((activity, offset), (watts, samples)) => (watts, samples)}
   .collect
}

When we invoke bestsForPeriod, we already have an RDD of trackpoint records corresponding to every sample in every activity under consideration. We have also optimized a set of k cluster centers (I typically use k=128 for my data) and have the resulting model available. In the body of this function, we:

  • calculate and cache the overlapping sample windows for each activity, converting each Trackpoint record (with the stripTrackpoints function, declared elsewhere) into a minimal record consisting only of latitude, longitude, and wattage (line 2),
  • we declare clusterPairs, a RDD of pairs in which window identifiers (that is, activity-offset pairs) are the keys and pairs of cluster centers (one for the starting cluster and one for the ending cluster) are the values (lines 3—4),
  • we declare mmps, an RDD of pairs in which the keys are window identifiers and the values are mean wattages for the corresponding window of samples (line 5),
  • we then find the top twenty efforts, only considering the best effort starting and ending in a given pair of clusters:
    • first, we join mmps with clusterPairs, giving us an RDD mapping from window identifiers to tuples of mean wattages and start-end cluster pairs (line 7),
    • we transform this mapping so that it’s keyed by the start-end clusters of each window (line 8),
    • and reject all but the best effort for each start-end cluster pair (line 9),
    • we then re-key the resulting pairs by mean wattage (line 10),
    • and sort by mean wattage in descending (hence false) order (line 11)
    • now, up until this point, we haven’t actually done anything besides construct a directed acyclic graph of RDD dependencies, since we’ve exclusively invoked RDD transformations, which are lazy.5 However, in line 12, we’re going to force computation of the mean wattage-window identifier pairs and send the twenty best ones back to the driver program
  • we convert the collection of pairs back to an RDD (line 14; note that we’re assuming low default parallelism or possible expansion to consider more than 20 best efforts), and immediately transpose each pair to re-key by window identifier (line 15),
  • join this collection with the windows themselves (line 16), resulting in a map from window identifiers to tuples of mean wattages and the samples themselves6 that contains only our best 20 efforts, and
  • strip the window identifier from each tuple, leaving only pairs of wattages and windows (line 17) before firing off a Spark job to actually do the computations we described in lines 14—17, returning the results to the driver program (line 18)

This code runs quickly enough for prototyping, but it could be a lot faster. Experienced Spark programmers likely see several opportunities for improvement here; hopefully the changes we’re about to make will cover most of them.

Broadcasting: a simple win

The first and easiest change we can make is to ensure that we aren’t passing too much data in closure environments, since that data has to be serialized and shipped to workers (in my case, on a single multiprocessor workstation, but potentially across a cluster). Line 4 in our original function relied on having a KMeansModel available in the closure’s environment. Now, a k-means model for 128 cluster centers isn’t huge by any means, but it’s also not a trivial thing to have to serialize along with a relatively small closure. By passing the model in a Spark broadcast variable instead, we can reduce the cost of creating and executing the RDD transformations that depend on it:

excerpted from power_bests.scala
1
2
3
4
5
6
7
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: Broadcast[KMeansModel]) = {
  val windowedSamples = windowsForActivities(data, period, stripTrackpoints _).cache
  val clusterPairs = windowedSamples
    .map {case ((activity, offset), samples) => ((activity, offset), (closestCenter(samples.head.latlong, model.value), closestCenter(samples.last.latlong, model.value)))}

    // ... rest of function omitted
}

Note that all I did here was change bestsForPeriod to expect a Spark broadcast variable instead of a raw KMeansModel, and then pass model.value to the closestCenter function where it had previously expected a KMeansModel. This very simple change resulted in an approximately 1.1x speedup on my code. If you have large read-only data (whether bigger models or anything else) that needs to go to Spark workers with closure arguments, you might see even bigger improvements.

To create a broadcast variable, simply call the broadcast method of SparkContext with the value you wish to broadcast as a parameter:

Broadcast variable example
1
2
3
4
5
6
scala> val evenNumbers = (0 to 100000 by 2).toArray
evenNumbers: Array[Int] = Array(0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198, 200, 202, 204, 206, 208, 210, 212, 214, 216, 218, 220, 222, 224, 226, 228, 230, 232, 234, 236, 238, 240, 242, 244, 246, 248, 250, 252, 254, 256, 258, 260, 262, 264, 266, 268, 270, 272, 274, 276, 278, 280, 282, 284, 286, 288, 290, 292, 294, 296, 298, 300, 302, 304, 306, 308, 310, 312, 314, 316, 318, 320, 322, 324, 326, ...
scala> val broadcastEvens = sc.broadcast(evenNumbers)
14/09/09 14:45:40 INFO storage.MemoryStore: ensureFreeSpace(200048) called with curMem=0, maxMem=9239474995
14/09/09 14:45:40 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 195.4 KB, free 8.6 GB)
broadcastEvens: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

Eliminating counterproductive caching

One of the big benefits to using Spark over other distributed computing frameworks is that intermediate results can be cached in cluster memory. However, caching can be both a gift and a curse: it’s always possible to cache something, even something you use many times, whose impact on your memory usage far outweighs the cost of recomputing it. In our case, the largest collection we deal with is the windowedSamples RDD, which, depending on the window size, can be hundreds of times larger than the input data. Actually computing the windows is not particularly expensive, they aren’t used many times, and we use only a tiny fraction of the windows at the end of the function in any case. By eliminating the .cache call on windowedSamples, we acknowledge that it is cheaper to recompute the few windows we’ll need to examine more than once than it is to dedicate the memory necessary to keeping the windows around. This change resulted in an approximately 1.75x speedup on my code.

I hesitate to call this a simple fix, since it requires some insight into your application. But as we’ve already seen with the Trackpoint transformation, memory pressure can make a huge difference to Spark applications. Just because you can cache something doesn’t mean you should. (This is a lesson that users of tabled Prolog systems learn quickly: if you start by tabling — that is, memoizing — every relation in your database, you’ll have a fast, expressive environment until everything explodes and your computer becomes a pile of glowing sand.)

Being sensitive to shuffles

Recall that when Spark actually executes a job, it pipelines as many individual tasks as possible into stages, which can be executed on workers without coordinating or shuffling data.7 Ideally, stages will consist of several tasks to allow for high utilization; intuitively, we want workers to spend far more time computing results than coordinating with the driver. However, when RDD transformations change the keys in a collection of pairs (for example), we force Spark to coordinate between workers, shuffling RDD elements around in order to repartition them.

An interesting non-improvement

Since our application deals with some fairly large collections (windowedSamples), shuffling RDDs can be especially painful. However, because of the way we want to use the data, it’s hard to eliminate all of the shuffles in our method. So instead, we’ll examine the impact of the shuffle by looking at another version of bestsForPeriod that starts by fusing the two transformations on windowSamples (from lines 3—5 in our original excerpt) into one transformation:

excerpted from power_bests.scala with transformation fusion
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: Broadcast[KMeansModel]) = {
  val windowedSamples = windowsForActivities(data, period, stripTrackpoints _)
  val bests = windowedSamples.map {
    case ((activity, offset), samples)  =>
      (
        (closestCenter(samples.head.latlong, model.value), closestCenter(samples.last.latlong, model.value)),
        (samples.map(_.watts).reduce(_ + _) / samples.size, (activity, offset))
      )
  }

  val top20 = bests.reduceByKey ((a, b) => if (a._1 > b._1) a else b)
   .reduceByKey ((a, b) => if (a._1 > b._1) a else b)
   .map {case ((headCluster, tailCluster), (watts, (activity, offset))) => (watts, (activity, offset))}
   .sortByKey(false)
   .take(20)

  app.context.parallelize(top20, app.context.defaultParallelism * 4)
   .map {case (watts, (activity, offset)) => ((activity, offset), watts)}
   .join (windowedSamples)
   .map {case ((activity, offset), (watts, samples)) => (watts, samples)}
   .collect
}

This transformation fusion essentially eliminates one of the map transformations and also a join, giving us a collection of tuples of start-end cluster pairs and pairs of mean wattages and window identifiers. Interestingly, eliminating these transformations made the application slower! In fact, it ran approximately 19% slower than the previous version. However, this code affords more opportunities to eliminate re-keying and shuffles, and in so doing, we can once again achieve comparable performance to the previous version.

Here’s where the code wound up after several transformations:

excerpted from improved power_bests.scala with transformation fusion
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: Broadcast[KMeansModel]) = {
  val windowedSamples = windowsForActivities(data, period, stripTrackpoints _)

  val bests = windowedSamples.map {
    case ((activity, offset), samples)  => (
      (closestCenter(samples.head.latlong, model.value), closestCenter(samples.last.latlong, model.value)),
      ((activity, offset), samples.map(_.watts).reduce(_ + _) / samples.size)
    )
  }.cache

  val top20 = bests.reduceByKey ((a, b) => if (a._2 > b._2) a else b)
   .map { case ((_, _), keep) => keep }
   .takeOrdered(20)(Ordering.by[((String, Int), Double), Double] { case ((_, _), watts) => -watts})

  app.context.parallelize(top20, app.context.defaultParallelism * 4)
   .join (windowedSamples)
   .map {case ((activity, offset), (watts, samples)) => (watts, samples)}
   .collect
}

Note that I’ve eliminated a lot of tuple transposition and other unnecessary re-keyings. In particular, one of the biggest wins was to use the takeOrdered method with an explicit Ordering to avoid re-keying tuples on mean wattages in order to use take(n). (There are still some tuple re-keyings that could be eliminated; in particular the map on line 19 could probably be handled more efficiently in the driver in many cases, but we’re talking about marginal gains at this point.)

After these changes, the fused-transformation implementation performed almost exactly as well in terms of wall-clock time as the previous implementation. Since the prior implementation was slightly higher-level, I decided I’d prefer it as a basis for future efforts, but I kept the takeOrdered instead of take(n), which resulted in a small (~5%) speedup.8

Lazily materializing sample windows

The biggest improvement I made was the most invasive, but it addressed the most obvious deficiency of the initial implementation: specifically, it’s terribly wasteful to store and shuffle sample windows that are relatively huge in the aggregate but cheap to compute and unlikely to be needed.

Instead of keeping a collection of sample windows keyed by window identifiers, I instead elected to summarize efforts by recording Effort structures containing mean wattage, activity name, and starting and ending timestamps. In order to do this, I generalized the sliding-window behavior from my ActivitySliding trait, allowing clients to specify their own transformations for individual windows:

excerpted from common.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
trait ActivitySliding {
  import org.apache.spark.rdd.RDD
  import com.freevariable.surlaplaque.data.Trackpoint

  def windowsForActivities[U](data: RDD[Trackpoint], period: Int, xform: (Trackpoint => U) = identity _) = {
    applyWindowed(data, period, {case (activity, samples, offset) => ((activity, offset), samples.map(xform))})
  }

  def applyWindowed[U: ClassTag](data: RDD[Trackpoint], period: Int, xform: ((String, Seq[Trackpoint], Int) => U)) = {
    val pairs = data.groupBy((tp:Trackpoint) => tp.activity.getOrElse("UNKNOWN"))
    pairs.flatMap {
      case (activity: String, stp:Seq[Trackpoint]) =>
        (stp sliding period).zipWithIndex.map { case (s, i) => xform(activity, s, i) }
    }
  }

  def applyWindowedNoZip[U: ClassTag](data: RDD[Trackpoint], period: Int, xform: ((String, Seq[Trackpoint]) => U)) = {
    val pairs = data.groupBy((tp:Trackpoint) => tp.activity.getOrElse("UNKNOWN"))
    pairs.flatMap {
      case (activity: String, stp:Seq[Trackpoint]) => (stp sliding period).map { xform(activity, _) }
    }
  }

  def identity(tp: Trackpoint) = tp
}

I was then able to use the applyWindowedNoZip function to generate a collection of tuples in which the keys were start-end cluster pairs and the values were Effort structures. I still needed to filter the whole dataset to find the few windows I cared about after identifying the best efforts for each pair of clusters, but that was pretty straightforward. However, I now needed to make sure I cached that RDD of every trackpoint before entering bestsForPeriod in order to avoid reloading them from disk multiple times! Here’s what the final code looked like:

excerpted from power_bests.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: Broadcast[KMeansModel]) = {
  val clusteredMMPs = applyWindowedNoZip(data, period, {
      case (activity:String, samples:Seq[Trackpoint]) =>
        (
          (closestCenter(samples.head.latlong, model.value), closestCenter(samples.last.latlong, model.value)),
          Effort(samples.map(_.watts).reduce(_ + _) / samples.size, activity, samples.head.timestamp, samples.last.timestamp)
        )
    })

  clusteredMMPs
   .reduceByKey ((a, b) => if (a.mmp > b.mmp) a else b)
   .takeOrdered(20)(Ordering.by[((Int, Int), Effort), Double] { case (_, e:Effort) => -e.mmp })
   .map {
     case (_, e: Effort) => (
       e.mmp,
       data.filter {
         case tp: Trackpoint => tp.activity.getOrElse("UNKNOWN") == e.activity && tp.timestamp <= e.endTimestamp && tp.timestamp >= e.startTimestamp
       }.collect
     )
   }
}

This code is not particularly more difficult to read than the code we started with, but it is substantially faster; it is over 3.7x faster than the prior version and more than 14x faster than my initial prototype (not shown in this post). There are certainly further improvements to make (even some fairly obvious ones), but an important lesson from this exercise is that Spark not only makes it easy to experiment with novel analytic approaches, it also makes it easy to experiment with improved implementations that still retain their declarative elegance.

Conclusions

This post has shown several simple, orthogonal changes to a Spark application that readily compose together for a huge performance improvement. The changes touched on several broader themes you’ll want to keep in mind as you develop your own Spark applications:

  • Avoid shipping large data in closure environments, but instead prefer broadcast variables;
  • Don’t cache indiscriminately, but be sure that the cost of added memory pressure is worth the savings of avoiding recomputation;
  • Understand Spark’s execution model in order to help Spark help you; and
  • Don’t be featured on the next episode of “Big Data Hoarders,” but rather structure your application to embrace laziness and keep only what you need!

Thanks for reading! If you have further tips, you can reply to this post via Twitter.


  1. See this post for a high-level overview of the technique used by this app, along with some discussion of how the data guided the choices I made.

  2. Well, almost all of the metrics. It doesn’t capture heart rate or cadence because I’m not doing anything with those yet, and it doesn’t include speed sensor data because that can be derived from the coordinates of pairs of trackpoints.

  3. Xiangrui Meng proposed an implementation of sliding windows for RDDs earlier this year; the discussion in that link provides some good background on why exposing sliding-window functionality might be a bad idea. (Briefly, the issue is that cross-partition windows are necessarily very expensive, so as the window size approaches — or surpasses — the number of elements in a partition, performance tanks.) A sliding window implementation is currently available as an internal API for MLlib algorithms, though.

  4. This is the most obvious performance pitfall of this approach, and it was obvious from the beginning. Having the windows around, though, made experimenting with my data much easier. Spoiler alert: we’ll look at what happens when we eliminate this liability last.

  5. Advanced Spark users will also note that this statement involves a small lie, since the zipWithIndex method used in windowsForActivities needs to launch a Spark job to appropriately number records across partitions.

  6. The samples themselves are important since we want to plot the paths on which the best efforts occurred.

  7. A great introductory explanation of how Spark executes jobs is Aaron Davidson’s talk from Spark Summit 2014.

  8. Note that these different approaches may have resulted in similar wall-clock times but could have higher or lower utilization — and thus different ultimate performance characteristics — depending on your environment. Be sure to test possible improvements with your code and data!

Consider the collection1 of all contiguous subsequences of a sequence. If we’re talking about a stream of n observations, this could be the multiset of windows containing every possible window over these observations: 1 window of n samples, 2 windows of n-1 samples, 3 windows of n-2 samples, …, and n “windows” of 1 sample each. If we’re talking about a string of symbols, e.g., abcde, then we’d be talking about the following set of substrings:

{
  abcde, 
  abcd, bcde, 
  abc, bcd, cde,
  ab, bc, cd, de,
  a, b, c, d, e
}

Given a sequence of n elements, there will be T(n) subsequences in such a collection, where T(n) is the nth triangular number. If we must store each subsequence without using some representation where two subsequences can share elements they have in common (e.g., if we were to store each as a distinct C-style array), the space requirements to do so explode quickly:

space complexity plot

Anyway, I’m curious if this concept has a name. I’ve been thinking of it as a “power substring,” analogously to a powerset. If you know of a name for it, please let me know!


  1. I find myself thinking of subsequence identity as depending upon its position in the parent sequence, and thus thinking of this collection as a set even if it could contain multiple subsequences with identical elements.

If you’re like me, you often find yourself pasting transcripts into sbt console sessions in order to interactively test out new app functionality. A lot of times, these transcripts have a great deal of boilerplate and can be dramatically simplified. Fortunately, sbt provides a facility to let you specify what commands should run at the beginning of a console session: the project-specific initialCommands setting.1 sbt also provides a cleanupCommands setting to specify commands to run when your REPL exits, so if you’re testing anything that needs to have some cleanup code run before the JVM terminates, you can have that done automatically as well. (This is also useful to avoid ugly stack traces when developing Spark applications and quitting the console before stopping your SparkContext.) Finally, since sbt build definitions are just Scala code, you can conditionalize these command sets, for example, to only load test fixtures sometimes.

Here’s what this sort of automation looks like in a real build definition, with an excerpt of the Build.scala file from a (not-yet-merged) feature branch on my sur-la-plaque project, showing how I added automation to the REPL for the analysis subproject:2

excerpt of Build.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def optionallySetupFixtures = {
  sys.env.get("SLP_FIXTURES_FROM") match {
    case Some(dir: String) => s"""
      |val data = app.processFiles(SLP.listFilesInDir("$dir"))
      |data.registerAsTable("trackpoints")
    """.stripMargin
    case _ => ""
  }
}

def analysisSettings = baseSettings ++ sparkSettings ++ breezeSettings ++ dispatchSettings ++ testSettings ++ Seq(
  initialCommands in console :=
    """
      |import org.apache.spark.SparkConf
      |import org.apache.spark.SparkContext
      |import org.apache.spark.rdd.RDD
      |import com.freevariable.surlaplaque.importer._
      |import com.freevariable.surlaplaque.data._
      |import com.freevariable.surlaplaque.app._
      |
      |val conf = new SparkConf().setMaster("local[8]").setAppName("console").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      |val sc = new SparkContext(conf)
      |val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      |val app = new SLP(sc)
      |import sqlContext._
      |
    """.stripMargin + optionallySetupFixtures,
  cleanupCommands in console := "app.stop"
)

First, I’ve declared a simple optionallySetupFixtures function that generates code to load test data and register it with Spark SQL, but only if SLP_FIXTURES_FROM is set in the environment with the name of a directory containing activity files. The analysisSettings function returns a Seq of settings for the analysis subproject, first combining common settings, test settings, and library-specific settings for its dependencies (these are all declared elsewhere in the file). To this combination of common settings, we then add

  1. an initialCommands setting to ensure that our REPL session imports Spark and sur-la-plaque libraries and sets up SparkContext and SQLContext instances, and
  2. a cleanupCommands setting to gracefully shut down the SparkContext when we exit the REPL (via the stop method in the SLP application class)

Note that the initialCommands setting is the result of appending the static settings (our imports and variable declarations) with the result of calling optionallySetupFixtures, which will either be code to load and register our data or nothing, depending on our environment.

This functionality makes it easy to develop custom REPL environments or just save a lot of time while interactively experimenting with new techniques in your project. Even better, the investment required is absolutely minimal compared to the payoff of not having to paste or type boilerplate code in to every REPL session.


  1. This feature is mentioned in the documentation and used by Apache Spark, so I was familiar with it, but — for whatever reason — I hadn’t thought to apply it to my own projects until recently. I’m mentioning it here in case you didn’t think of it either!

  2. sur-la-plaque is a collection of applications dedicated to making sense of bicycling activity data; you can read more about some of the tools it includes here. The code is currently structured in two sbt projects: one for analysis code that actually processes data, and one that provides a web-based viewer of analysis results.

In earlier posts we introduced the concepts of type widening and type translation, discussed support for these in existing database systems, and presented a general approach to implementing type widening. In this post, we’ll extend our simple interpreter with support for type translations.

Adding functions to SimpleInterpreter

While we could implement type translation for AddExpression, we’d want to do so in such a way as to resolve the ambiguity inherent in adding a StringType and an IntType: should we coerce the StringType to an integer value or convert the IntType to a string representation? Put another way, should "12" + 3 evaluate to "123" or 15?1 While this is an interesting aesthetic question, we’ll not treat it further in this post.

Instead, we’ll extend our simple interpreter trait with support for functions:

LessSimpleInterpreter.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
object LessSimpleInterpreter extends SimpleInterpreter {

  class BinaryFunction[L,R,Result](val lhs: Expression[L], val rhs: Expression[R], val f: ((L, R) => Result))
      extends Expression[Result] {
    type left = L
    type right = R
    type result = Result

    def eval: Value[Result] = {
      val lv = lhs.eval
      val rv = rhs.eval
      new Value[Result](f(lv.get, rv.get)) {}
    }
  }

  object BinaryFunction {
    def apply[L,R,Result](lhs: Expression[L], rhs: Expression[R], f: ((L, R) => Result)) =
      new BinaryFunction(lhs, rhs, f)

    def unapply[L,R,Result](bf: BinaryFunction[L,R,Result]):
        Option[(Expression[L],Expression[R],((L, R) => Result))] =
      Some((bf.lhs, bf.rhs, bf.f))
  }
}

Notice that we don’t declare BinaryFunction as a case class, since we might want to declare other case classes that inherit from it.2 We could thus use BinaryFunction directly, like this:

BinaryFunction example
1
2
3
4
5
6
7
8
scala> import LessSimpleInterpreter._
import LessSimpleInterpreter._

scala> val max = BinaryFunction(IntValue(4), IntValue(5), ((a:Int, b:Int) => if (a > b) a else b))
max: LessSimpleInterpreter.BinaryFunction[Int,Int,Int] = LessSimpleInterpreter$BinaryFunction@301ec38b

scala> max.eval.get
res0: Int = 5

Alternatively, we could implement subclasses of BinaryFunction with baked-in function parameters, as follows:

MaxFunction.scala
1
2
3
4
5
import LessSimpleInterpreter._

class MaxFunction[T, Result](lhs: Expression[T], rhs: Expression[T])
    (implicit ev: T => Ordered[T])
    extends BinaryFunction(lhs, rhs, ((a:T, b:T) => if (a > b) a else b)) { }

Note that, in this case (with no type coercion), we use a single class representing both a function and a function application. We’ll be changing that shortly.

Combining widening and function application

We’ll first produce an interpreter that combines both type widening and function application, using our “unzipped” widening interpreter, which finds widenings one parameter at a time, as a starting point.3 In the following example, we’ve created separate classes representing function definitions (BinaryFunction) and function application expressions (BinaryFunctionAppl). Each function definition has formal parameter types (that is, the parameter types it expects) and a result type; each function application has actual parameter types, which are the types of the expressions supplied as actual parameters. In a function application, we denote the formal parameter types as Lf and Rf and the actual parameter types as La and Ra. (As usual, these types can be equal, but they need not be.) Just like Apache Hive does when automatically wrapping UDF functions in the GenericUDF interface, we define the formal parameter types based on the formal parameter types of the underlying function implementation.

WideningFunctionInterpreter.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
object WideningFunctionInterpreter extends UnzippedWideningInterpreter {
  class BinaryFunction[L <: Type, R <: Type, Result <: Type]
      (fun: (L#nativeType, R#nativeType) => Result#nativeType) {

    def apply(lhs: L#nativeType, rhs: R#nativeType): Result#nativeType = fun(lhs, rhs)
  }

  class BinaryFunctionAppl[La <: Type, Ra <: Type, Lf <: Type, Rf <: Type, Result <: Type]
      (val lhs: Expression[La], val rhs: Expression[Ra], val f: BinaryFunction[Lf, Rf, Result])
      (implicit lconv: Value[La] => Value[Lf], rconv: Value[Ra] => Value[Rf])
      extends Expression[Result] {
    type left = Lf
    type right = Rf
    type result = Result

    def eval: Value[Result] = {
      val lv = lconv(lhs.eval)
      val rv = rconv(rhs.eval)
      new Value[Result](f(lv.get, rv.get)) {}
    }
  }
}

In order to have a valid function application, we need two witness functions (here called lconv and rconv) to convert values of the actual parameter types La and Ra to values of the formal parameter types Lf and Rf. We can see how this works in practice by defining a maximum function over DoubleType values and applying it to IntType values:

WideningFunctionInterpreter example
1
2
3
4
5
6
7
8
9
10
11
scala> import WideningFunctionInterpreter._
import WideningFunctionInterpreter._

scala> val max = new BinaryFunction[DoubleType,DoubleType,DoubleType]((x: Double, y: Double) => if (x > y) x else y)
max: WideningFunctionInterpreter.BinaryFunction[WideningFunctionInterpreter.DoubleType,WideningFunctionInterpreter.DoubleType,WideningFunctionInterpreter.DoubleType] = WideningFunctionInterpreter$BinaryFunction@72a7aa4f

scala> val appl = new BinaryFunctionAppl(IntValue(5), IntValue(7), max)
appl: WideningFunctionInterpreter.BinaryFunctionAppl[WideningFunctionInterpreter.IntType,WideningFunctionInterpreter.IntType,WideningFunctionInterpreter.DoubleType,WideningFunctionInterpreter.DoubleType,WideningFunctionInterpreter.DoubleType] = WideningFunctionInterpreter$BinaryFunctionAppl@1a536164

scala> val result = appl.eval.get
result: WideningFunctionInterpreter.DoubleType#nativeType = 7.0

Note that the problem of widening actuals to formals is even more straightforward than finding the least upper bound type of two operands in an addition, since we know what the formals are expected to be and thus know immediately whether we have relevant widenings or not.

Adding translation

The final enhancement to our simple interpreter is to add type translation. Since we can treat type widening as a limited case of type translation, our approach will handle both. For the sake of a straightforward example, we’ll simply allow converting from string values to doubles (à la Hive and other untyped database systems) as well as converting from a value to one of a wider type.

We’ll define a trait called and declare witness instances A ↪ B if there is a way to translate from a Value[A] to a Value[B]. Since we can’t statically guarantee that type translations will succeed in general, A ↪ B will be implemented as a partial function from Value[A] to Value[B].

TranslatingFunctionInterpreter.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import scala.util.{Try, Success}

object TranslatingFunctionInterpreter extends UnzippedWideningInterpreter {
  trait [A <: Type, B <: Type] extends PartialFunction[Value[A], Value[B]]

  implicit def wideningTranslation[A <: Type, B <: Type]
       (implicit ev: A#nativeType => B#nativeType): [A, B] = new ↪[A, B] {
     def apply(a: Value[A]): Value[B] = new Value[B](ev(a.get))
     def isDefinedAt(a: Value[A]) = true
  }

  implicit object StringToDouble extends ↪[StringType,DoubleType] {
    def apply(s: Value[StringType]) = {
      DoubleValue(Try(s.get.toDouble).toOption.get)
    }

    def isDefinedAt(s: Value[StringType]) = {
      Try(s.get.toDouble) match {
        case Success(_) => true
        case _ => false
      }
    }
  }

  class BinaryFunction[L <: Type, R <: Type, Result <: Type]
      (fun: (L#nativeType, R#nativeType) => Result#nativeType) {

    def apply(lhs: L#nativeType, rhs: R#nativeType): Result#nativeType = fun(lhs, rhs)
  }

  class BinaryFunctionAppl[La <: Type, Ra <: Type, Lf <: Type, Rf <: Type, Result <: Type]
      (val lhs: Expression[La], val rhs: Expression[Ra], val f: BinaryFunction[Lf, Rf, Result])
      (implicit lconv: La  Lf, rconv: Ra  Rf)
      extends Expression[Result] {
    type left = Lf
    type right = Rf
    type result = Result

    def eval: Value[Result] = {
      val lv = lconv(lhs.eval)
      val rv = rconv(rhs.eval)
      new Value[Result](f(lv.get, rv.get)) {}
    }
  }
}

Once we’ve implemented the partial functions to convert strings to doubles, our interpreter is very similar to the type-widening interpreter. We can see that it works by attempting to take the maximum of a double and a string representation of a double:

TranslatingFunctionInterpreter example
1
2
3
4
5
6
7
8
9
10
11
scala> import TranslatingFunctionInterpreter._
import TranslatingFunctionInterpreter._

scala> val max = new BinaryFunction[DoubleType,DoubleType,DoubleType]((x: Double, y: Double) => if (x > y) x else y)
max: TranslatingFunctionInterpreter.BinaryFunction[TranslatingFunctionInterpreter.DoubleType,TranslatingFunctionInterpreter.DoubleType,TranslatingFunctionInterpreter.DoubleType] = TranslatingFunctionInterpreter$BinaryFunction@4ab550d5

scala> val appl = new BinaryFunctionAppl(DoubleValue(5.0), StringValue("7.0"), max)
appl: TranslatingFunctionInterpreter.BinaryFunctionAppl[TranslatingFunctionInterpreter.DoubleType,TranslatingFunctionInterpreter.StringType,TranslatingFunctionInterpreter.DoubleType,TranslatingFunctionInterpreter.DoubleType,TranslatingFunctionInterpreter.DoubleType] = TranslatingFunctionInterpreter$BinaryFunctionAppl@6e1b9411

scala> appl.eval.get
res0: TranslatingFunctionInterpreter.DoubleType#nativeType = 7.0

In the remainder of this post, we’ll sketch a couple of simple extensions to this basic approach.

Dealing with failures

Note that our interpreter will crash if asked to evaluate a function application for which type translation fails. Failure is probably fine in these cases for general-purpose programming languages (although we’d probably want to model failure in the interpreted language or otherwise do something nicer than terminating with a Scala MatchError because a partial function isn’t defined for its input). However, if we’re interested in emulating database query languages, we have another option for dealing with translation failures: simply return a null value.

Since most database query language functions — both built-in and user-defined — must handle null values appropriately, this wouldn’t necessarily complicate function implementations any further. It would require some tweaks to the way we’ve ascribed types to this program; in particular, translation functions and function applications would have to return either a value of the translated type or null (if translation failed).4

Supporting polymorphism

One interesting remaining question is this: can we extend our interpreter to allow encoding simple generic functions? By “simple” generic functions, I mean those whose parameter types may consist of either concrete types or type variables, but whose output type is either not a type variable or is the same as one of its input types. As a concrete example, let’s say we wanted a function pow[X](base: X, exp: X): X where X could be either a double-precision floating-point value or an arbitrary-precision decimal — but, in either case, the result type would be the same as the actual parameter types.

Assuming the existence of appropriate object-language encodings for type constraints, subtyping, and substitutability relations between interpreter types, we could proceed via iterative application of widening and translation until we reached a fixed point. (This process would terminate so long as the widening and translation relations imposed a complete partial order on the set of interpreted-language types.)

However we’d likely not need such generality to implement polymorphic functions in a query language. Apache Hive, for example, allows coercing strings to either DECIMAL or DOUBLE values. So a simple approach would be to find the narrowest of those two types that string arguments could be coerced to (call this type TLub) and then widen all other arguments of type X to TLub.5

A parting thought

The interpreters presented in these posts took care to get things implemented safely (OK, as safely as practical) in Scala’s type system. There are almost certainly better ways to statically capture the safety properties we might care about. However, if we were in a more dynamic context — such as in an untyped language, or in a trivially-typed fragment of a Scala program6 — many of these approaches would admit terser implementations.


  1. Some related questions: should "12" + 3 evaluate to the same thing as 12 + "3"? Should both be legal? Java allows the former (evaluating it as string concatenation by invoking .toString on an automatically-boxed 3), but not the latter.

  2. We mimic some (but not all) case class functionality here. We have declared an apply method to the companion object, to allow constructing BinaryFunction instances without new, and an unapply method to allow matching and deconstructing BinaryFunction instances.

  3. See this post for more details on this approach.

  4. Given NullType extending Type, we’d have a few options: we could return an instance of Scala’s Either[A <: Value[_], Value[NullType]]; we could convert all interpreter eval methods to return an Option[Value[_]]; we could relax static typing on function return values; etc.

  5. As a further practical consideration, many of Hive’s numeric functions that accept either DECIMAL or DOUBLE values only operate on double-precision values and thus narrow DECIMAL arguments internally at evaluation time.

  6. Consider, for example, representing all interpreter values as Option[Any] and using explicit casts.

In this installment of our series on type coercions, we’re going to introduce a way to support type widening in a language interpreter. We’ll present a general approach based on semilattices of types and a particular implementation of this approach that uses a straightforward encoding in Scala’s type system. We’ll work from a simple interpreter to allow for a clear exposition, but our general techniques will be applicable to more involved languages as well.

Widening functions

A widening function maps values from some type T to a wider type U. We can implement a trivial but generic type-widening method based on the partial ordering of types encoded in Scala’s type system:

Widening example
1
2
3
4
5
6
7
8
9
10
scala> def widen[T,U](t: T)(implicit w: T => U): U = w(t)

scala> val longFive = widen[Int,Long](5)
longFive: Long = 5

scala> val doubleFive = widen[Int,Double](5)
doubleFive: Double = 5.0

scala> val arbitraryFive = widen[Int,BigDecimal](5)
arbitraryFive: BigDecimal = 5

Invoking widen[A,B] on an argument of type A will succeed if there is a witness object for A => B. By default, we’ll be able to see the predefined widenings in Scala, including reflexive widenings. Note that there are no implicitly defined narrowings, though:

Exploring witness objects for predefined widenings
1
2
3
4
5
6
7
8
9
10
scala> implicitly[Int => Double]
res0: Int => Double = <function1>

scala> implicitly[Int => Int]
res1: Int => Int = <function1>

scala> implicitly[Double => Int]
<console>:8: error: No implicit view available from Double => Int.
              implicitly[Double => Int]
                        ^

It’s important to note that we could declare other witnesses to A => B for other A and B types and have them in scope; we aren’t constrained by Scala’s predefined definitions or by static relationships between implementation types. (We’ll come back to this point later, when we’re thinking about how to model types in the interpreted language.)

A simply-typed interpreter

We’ll start with a simple interpreter with four kinds of values (integers, doubles, strings, and nulls) and one kind of expression representing numeric addition or string concatenation, depending on the types of its operands. It is a stretch to call the language embodied in this interpreter “typed” (since it has only literal values and expressions but no variables). However, because of the way we’ve encoded the interpreter in Scala, it is impossible to express programs with runtime errors. In particular, it is only possible to create an AddExpression with two arguments that evaluate to values of the same type.

SimpleInterpreter.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
trait SimpleInterpreter {

  trait Addable[T] {
    def plus(self: T, other: T): Value[T]
  }

  implicit object IntAddable extends Addable[Int] {
    def plus(self: Int, other: Int) = IntValue(self + other)
  }

  implicit object DoubleAddable extends Addable[Double] {
    def plus(self: Double, other: Double) = DoubleValue(self + other)
  }

  implicit object StringAddable extends Addable[String] {
    def plus(self: String, other: String) = StringValue(self + other)
  }

  abstract class Expression[T] {
    def eval: Value[T]
  }

  abstract class Value[T](v: T) extends Expression[T] {
    def eval: Value[T] = this
    def get: T = v
  }

  case class IntValue(v: Int) extends Value(v) {}
  case class DoubleValue(v: Double) extends Value(v) {}
  case class StringValue(v: String) extends Value(v) {}
  case object NullValue extends Value(null) {}

  case class AddExpression[T](lhs: Expression[T], rhs: Expression[T])(implicit ev: Addable[T]) extends Expression[T] {
    def eval: Value[T] = {
      val lv = lhs.eval
      val rv = rhs.eval
      ev.plus(lv.get, rv.get)
    }
  }
}

object SimpleInterpreter extends SimpleInterpreter { }

Adding widening

If we have an expression of the form t1 • t2, where the left-hand side is of type T1 and the right-hand side is of type T2, we will be able to convert this to an expression in which both operands have the same type if the following conditions are met:

  1. There must exist some type U such that T1UT2U, and
  2. There must exist widening functions with the signatures T1 ⇒ U and T2 ⇒ U.

Finding U is simply finding the least upper bound of T1 and T2 on a semilattice of types. Once we have this least upper bound, if we also have appropriate widening functions, we can convert both t1 and t2 to values of the same type. In the following code, we extend our simple interpreter by modeling interpreter types in Scala, making types properties of values, and adding widening conversions to AddExpression by explicitly encoding the partial ordering of types — in this case, based on a relationship between the nativeType type member of each Type class. (We’re doing widening statically through Scala’s type system in this case, but there’s no reason why we couldn’t take a similar approach dynamically, handling errors by raising exceptions at runtime.)

WideningInterpreter.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
object WideningInterpreter {
  import scala.language.implicitConversions

  sealed abstract class Type {
    type nativeType <: Any
  }
  class IntType extends Type {
    override type nativeType = Int
  }
  class DoubleType extends Type {
    override type nativeType = Double
  }
  class StringType extends Type {
    override type nativeType = String
  }

  abstract class Expression[K <: Type] {
    def eval: Value[K]
  }

  class Value[K <: Type](v: K#nativeType) extends Expression[K] {
    def eval: Value[K] = this
    def get: K#nativeType = v
  }

  case class IntValue(v: Int) extends Value[IntType](v) {}
  case class DoubleValue(v: Double) extends Value[DoubleType](v) {}
  case class StringValue(v: String) extends Value[StringType](v) {}

  sealed trait Addable[K <: Type] {
    def plus(self: K#nativeType, other: K#nativeType): Value[K]
  }

  implicit object IntAddable extends Addable[IntType] {
    def plus(self: Int, other: Int) = IntValue(self + other)
  }

  implicit object DoubleAddable extends Addable[DoubleType] {
    def plus(self: Double, other: Double) = DoubleValue(self + other)
  }

  implicit object StringAddable extends Addable[StringType] {
    def plus(self: String, other: String) = StringValue(self + other)
  }

  // We need some way to constrain our generic widening operators so
  // that an expression with identical operand types won't have an
  // ambiguous implicit argument.  One way to do this is to make sure
  // that one of the widening functions will only apply if the arguments
  // are of different types.  

  // These type inequality instances are taken from an answer Miles Sabin 
  // gave on StackOverflow:  http://stackoverflow.com/a/6944070

  trait =!=[A, B]
  implicit def neq[A, B] : A =!= B = null
  implicit def neqAmbig1[A] : A =!= A = null
  implicit def neqAmbig2[A] : A =!= A = null

  implicit def leftWiden[T <: Type, U <: Type](v1: Value[T], v2: Value[U])
      (implicit conv: (T#nativeType => U#nativeType)): (Value[U], Value[U]) =
    (new Value[U](conv(v1.get)), v2)

  implicit def rightWiden[T <: Type, U <: Type](v1: Value[U], v2: Value[T])
      (implicit neq: T =!= U,
                conv: (T#nativeType => U#nativeType)): (Value[U], Value[U]) =
    (v1, new Value[U](conv(v2.get)))

  case class AddExpression[T <: Type, U <: Type, V <: Type]
      (lhs: Expression[T], rhs: Expression[U])
      (implicit widen: (Value[T], Value[U]) => (Value[V], Value[V]), adder: Addable[V]) extends Expression[V] {
    def eval = {
      val lv = lhs.eval
      val rv = rhs.eval
      val args = widen(lv, rv)
      adder.plus(args._1.get, args._2.get)
    }
  }
}

In WideningInterpreter we extend AddExpression to allow it to have two potentially distinct argument types (Value[T] and Value[U]) and by requiring evidence of an implicit conversion from a pair of values with distinct types to a pair of values with the same type.1 We define two witness functions, leftWiden for the case in which the left element of the pair is narrower than the right, and rightWiden for the case in which the right element of the pair is narrower than the left. In both cases, we determine that a type T is narrower than another type U if Scala knows how to widen values of the representation type (Type#nativeType) of T to the representation type of U; this is the case if an implicit resolution exists for the conv argument.

The problem we might encounter is that, because our partial ordering is reflexive, if T and U are the same type, then there will be witnesses both for T#nativeType => U#nativeType and U#nativeType => T#nativeType. So if we were to have naive implementations of leftWiden and rightWiden that only depended on evidence of such a conversion, Scala would be unable to unambiguously resolve which would apply in the case of monomorphic AddExpressions. We resolve this problem by adding a test for type inequality (due to Miles Sabin) to the implicit argument list of rightWiden, so that it will not apply if the arguments are of the same type.2

Note that the partial ordering among interpreter types (IntType, DoubleType, and StringType) does not depend on Scala-level subtyping relationships between interpreter types. This is important because in a more realistic language we will want the flexibility to model data types independently of properties of our object-language implementation. Instead, we have a generic partial ordering in this example based on predefined relationships between representation types, and we could extend the partial ordering to other types by adding other instances of =>[A,B] for other types of interest.

For a small number of interpreter types, we could also explicitly encode the partial ordering, as in the example below:

A more explicit partial ordering encodinglink
1
2
3
4
5
implicit def intDoubleWiden(v1: Value[IntType], v2: Value[DoubleType]): (Value[DoubleType], Value[DoubleType]) =
  (DoubleValue(v1.get.toDouble), v2)

implicit def doubleIntWiden(v1: Value[DoubleType], v2: Value[IntType]): (Value[DoubleType], Value[DoubleType]) =
  (v1, DoubleValue(v2.get.toDouble))

Since in this example we have a total ordering among types, we can also easily widen one argument at a time by adding a witness object for the least upper bound of T and U,3 as in the example below:

A partial ordering encoding that finds appropriate widenings one argument at a timelink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
implicit def widen[T <: Type, U <: Type](v1: Value[T])
    (implicit conv: (T#nativeType => U#nativeType)): (Value[U]) =
  new Value[U](conv(v1.get))

implicit def reflexiveWiden[T <: Type](v1: Value[T]): Value[T] = v1

trait LT[T <: Type, U <: Type] {}
implicit def intDoubleLT: LT[IntType, DoubleType] = null

trait WiderThan[T <: Type, U <: Type, V <: Type] {}
implicit def rightWider[T <: Type, U <: Type, V <: Type]
  (implicit rw: LT[T, U],
            conv2: (U#nativeType => V#nativeType),
            conv1: (T#nativeType => V#nativeType)): WiderThan[T,U,V] = null

implicit def leftWider[T <: Type, U <: Type, V <: Type]
  (implicit rw: LT[U, T],
            conv2: (T#nativeType => V#nativeType),
            conv1: (U#nativeType => V#nativeType)): WiderThan[T,U,V] = null

implicit def reflWider[T <: Type]: WiderThan[T, T, T] = null

case class AddExpression[T <: Type, U <: Type, V <: Type]
    (lhs: Expression[T], rhs: Expression[U])
    (implicit lub: WiderThan[T, U, V],
              widenLeft: Value[T] => Value[V],
              widenRight: Value[U] => Value[V],
              adder: Addable[V]) extends Expression[V] {
  def eval = {
    val lv = widenLeft(lhs.eval)
    val rv = widenRight(rhs.eval)
    adder.plus(lv.get, rv.get)
  }
}

Again, a similar approach would be applicable in an untyped Scala representation of interpreter-language types: we could represent types as terms, implement the least-upper-bound relation as a partial function mapping from a pair of terms to the least upper bound of the pair, and implement widenings as functions taking a value and a term representing the type to widen to.


  1. See the signature for the widen implicit argument to AddExpression: (Value[T], Value[U]) => (Value[V], Value[V])

  2. This type-inequality test will fail if it is given two identical types because both neqAmbig1 and neqAmbig2 will both be applicable.

  3. It occurred to me while developing these examples that using witness objects in this way is a lot like forward-chaining logic programming. (Note that we use negation-as-failure in implicit resolution when testing for type inequality and we use implicit arguments to guide further implicit resolution with the WiderThan witness.) Unsurprisingly, it turns out that other people have had the same idea! See the discussion on this post or this talk for two examples.

In my last post, I introduced two kinds of implicit type coercions that can appear in database query languages: type widenings, in which values are converted to wider types (e.g. from an int to a long or double), and type translations, in which a value of some type T might be converted to one of an unrelated type U if it is used where a value of U is expected. In this post, we’ll look at what sort of type coercions are available in Apache Hive and (in less detail) Microsoft SQL Server.

Implicit conversions in Apache Hive

Apache Hive features several kinds of types, many of which are also present in ANSI SQL with similar definitions:

  1. hardware-supported integral types, such as tinyint (one byte), smallint (two bytes), int (four bytes), and bigint (eight bytes);
  2. hardware-supported floating-point types, such as float (single-precision, four bytes) and double (double-precision, eight bytes);
  3. decimal values (38 digits precision in Hive 0.11 and 0.12; arbitrary-precision in Hive 0.13.0 and later);
  4. date and time types, such as timestamp and date;
  5. string types, including string (of arbitrary length), varchar[N] (of arbitrary length but less than N characters), and char[N] (of exactly N characters, possibly padded with spaces);
  6. boolean values;
  7. binary values (sequences of bytes); and
  8. compound values made up of Hive types: homogeneous arrays with some element type, maps containing keys of one type and values of another, and C-style struct and union types.

Hive supports some widenings and narrowings between these types.1 Among the hardware-supported numeric types, values can be widened but not narrowed.2 Strings can be narrowed to be used as varchar values; converting a string value to a varchar[N], where N is insufficient to hold the contents of the string, will cause the string to be truncated to N characters. It is also possible (as of Hive 0.13) to supply a decimal argument to many numeric functions that expect a double input, although in most cases the function will only process a double approximating the supplied arbitrary-precision value.

Hive also supports type translations to and from string values. Hive permits implicitly converting a value of any type (with the exception of boolean and binary) to a string. String representations of double or decimal values (but not the smaller integral or floating-point types) can also be converted to values of those types.

Hive supports widenings as part of object comparisons; the FunctionRegistry.getCommonClassForComparison method returns the least upper bound of two types. The code excerpt below shows how Hive also explicitly encodes which widenings and translations are permissible:

excerpted from Hive 0.12’s FunctionRegistry.javalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public static boolean implicitConvertable(PrimitiveCategory from, PrimitiveCategory to) {
  if (from == to) {
    return true;
  }

  PrimitiveGrouping fromPg = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(from);
  PrimitiveGrouping toPg = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(to);

  // Allow implicit String to Double conversion
  if (fromPg == PrimitiveGrouping.STRING_GROUP && to == PrimitiveCategory.DOUBLE) {
    return true;
  }
  // Allow implicit String to Decimal conversion
  if (fromPg == PrimitiveGrouping.STRING_GROUP && to == PrimitiveCategory.DECIMAL) {
    return true;
  }
  // Void can be converted to any type
  if (from == PrimitiveCategory.VOID) {
    return true;
  }

  // Allow implicit String to Date conversion
  if (fromPg == PrimitiveGrouping.DATE_GROUP && toPg == PrimitiveGrouping.STRING_GROUP) {
    return true;
  }
  // Allow implicit Numeric to String conversion
  if (fromPg == PrimitiveGrouping.NUMERIC_GROUP && toPg == PrimitiveGrouping.STRING_GROUP) {
    return true;
  }
  // Allow implicit String to varchar conversion, and vice versa
  if (fromPg == PrimitiveGrouping.STRING_GROUP && toPg == PrimitiveGrouping.STRING_GROUP) {
    return true;
  }

  // Allow implicit conversion from Byte -> Integer -> Long -> Float -> Double
  // Decimal -> String
  Integer f = numericTypes.get(from);
  Integer t = numericTypes.get(to);
  if (f == null || t == null) {
    return false;
  }
  if (f.intValue() > t.intValue()) {
    return false;
  }
  return true;
}

To see how Hive actually performs type coercions, we’ll have to take a step back and look at Hive’s architecture for defining functions.3 Hive has two interfaces for defining functions: UDF, which models a simple function with simply-typed arguments and a simply-typed return value, and GenericUDF, which models functions that can operate on and return values of compound types.

Subclasses of UDF include at least one method called evaluate (of arbitrary argument and return types); this is what gets called when the user-defined function is evaluated. Due to their flexible signatures, these evaluate methods are not specified in any interface and instead found via Java reflection. By contrast, a GenericUDF must support an initialize method that takes an array of ObjectInspector instances (essentially adapters from arbitrary types to concrete object values) and an evaluate method taking an array of DeferredObject instances (essentially futures representing objects).

The initialize method in GenericUDF is invoked with ObjectInspector instances corresponding to actual parameters; if the actuals aren’t implicitly convertible to the proper types, it will fail. Otherwise, it will return an ObjectInspector instance for the return type. As a simple example, see the initialize method in the class providing Hive’s implementation of the SQL CONCAT function:

excerpted from Hive 0.12’s GenericUDFConcatWS.javalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
  if (arguments.length < 2) {
    throw new UDFArgumentLengthException(
        "The function CONCAT_WS(separator,[string | array(string)]+) "
          + "needs at least two arguments.");
  }

  // check if argument is a string or an array of strings
  for (int i = 0; i < arguments.length; i++) {
    switch(arguments[i].getCategory()) {
      case LIST:
        if (isStringOrVoidType(
            ((ListObjectInspector) arguments[i]).getListElementObjectInspector())) {
          break;
        }
      case PRIMITIVE:
        if (isStringOrVoidType(arguments[i])) {
        break;
        }
      default:
        throw new UDFArgumentTypeException(i, "Argument " + (i + 1)
          + " of function CONCAT_WS must be \"" + serdeConstants.STRING_TYPE_NAME
          + " or " + serdeConstants.LIST_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME
          + ">\", but \"" + arguments[i].getTypeName() + "\" was found.");
    }
  }

  argumentOIs = arguments;
  return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}

Note that the above verifies both the correct number of arguments and the correct types of each argument before returning an ObjectInspector instance for writable strings. The evaluate method then invokes DeferredObject.get() on each argument, converts them to String values using built-in coercions, and concatenates them together, returning the result as a text value.

Plain UDF instances and GenericUDF instances alike are stored in Hive’s function registry, but the former are converted to GenericUDF instances first by wrapping them GenericUDFBridge, which is a proxy that uses Java introspection on the underlying UDF instance to determine what a function’s expected argument types are; it can then convert actual parameters to values of appropriate types using built-in coercions at execution time.

Implicit conversions in Microsoft SQL Server

While we can’t examine conversions supported in Microsoft SQL Server in as great detail as we can with Apache Hive (since the source for SQL Server isn’t available), the published documentation indicates which conversions are supported. In brief, SQL Server supports most of the same kinds of type coercions as Hive, with the following additions:

  1. bidirectional implicit translation from char[N] and varchar[N] to all numeric types (not merely double and decimal, as in Hive);
  2. financial types (money and smallmoney) are supported and can be implicitly translated to and from numeric types;
  3. bidirectional implicit translation between timestamp values to and from character and integral types;
  4. the sql_variant type, which can receive values of most types via implicit conversions but must be converted with an explicit CAST in contexts expecting a value of a different type; and
  5. various other types (xml, uniqueidentifier, and user-defined types from the CLR) with varying conversion semantics.

These additions are useful but their absence does not limit Hive’s expressive power. In the next post in this series, we’ll look at a general approach to implementing type widening, along with a specific (and statically-safe) realization of this approach using Scala’s type system.


  1. The Hive wiki includes a full conversion matrix.

  2. For example, it is permissible to use a tinyint where an int or double is expected, but not vice versa.

  3. Actually implementing new functions in Hive is outside the scope of this post, but there are lots of resources online if you’re interested. In particular, Matthew Rathbone has a great article about extending Hive with new functions.