Lately, I’ve been experimenting with Spark’s implementation of word2vec. Since most of the natural-language data I have sitting around these days are service and system logs from machines at work, I thought it would be fun to see how well word2vec worked if we trained it on the text of log messages. This is obviously pretty far from an ideal training corpus, but these brief, rich messages seem like they should have some minable content. In the rest of this post, I’ll show some interesting results from the model and also describe some concrete preprocessing steps to get more useful results for extracting words from the odd dialect of natural language that appears in log messages.

Background

word2vec is a family of techniques for encoding words as relatively low-dimensional vectors that capture interesting semantic information. That is, words that are synonyms are likely to have vectors that are similar (by cosine similarity). Another really neat aspect of this encoding is that linear transformations of these vectors can expose semantic information like analogies: for example, given a model trained on news articles, adding the vectors for “Madrid” and “France” and subtracting the vector for “Spain” results in a vector very close to that for “Paris.”

Spark’s implementation of word2vec uses skip-grams, so the training objective is to produce a model that, given a word, predicts the context in which it is likely to appear.

Preliminaries

Like the original implementation of word2vec, Spark’s implementation uses a window of ±5 surrounding words (this is not user-configurable) and defaults to discarding all words that appear fewer than 5 times (this threshold is user-configurable). Both of these assumptions seem sane for the sort of training “sentences” that appear in log messages, but they won’t be sufficient.

Spark doesn’t provide a lot of tools for tokenizing and preprocessing natural-language text.1 Simple string splitting is as ubiquitous in trivial language processing examples just as it is in trivial word count examples, but it’s not going to give us the best results. Fortunately, there are some minimal steps we can take to start getting useful tokens out of log messages. We’ll look at these steps and what see what motivates them now.

What is a word?

Let’s first consider what kinds of tokens might be interesting for analyzing the content of log messages. At the very least, we might care about:

  1. dictionary words,
  2. trademarks (which may or may not be dictionary words),
  3. technical jargon terms (which may or may not be dictionary words),
  4. service names (which may or may not be dictionary words),
  5. symbolic constant names (e.g., ENOENT and OPEN_MAX),
  6. pathnames (e.g., /dev/null), and
  7. programming-language identifiers (e.g., OutOfMemoryError and Kernel::exec).

For this application, we’re less interested in the following kinds of tokens, although it is possible to imagine other applications in which they might be important:

  1. hostnames,
  2. IPv4 and IPv6 addresses,
  3. MAC addresses,
  4. dates and times, and
  5. hex hash digests.

Preprocessing steps

If we’re going to convert sequences of lines to sequences of sequences of tokens, we’ll eventually be splitting strings. Before we split, we’ll collapse all runs of whitespace into single spaces so that we get more useful results when we do split. This isn’t strictly necessary — we could elect to split on runs of whitespace instead of single whitespace characters, or we could filter out empty strings from word sequences before training on them. But this makes for cleaner input and it makes the subsequent transformations a little simpler.

Here’s Scala code to collapse runs of whitespace into a single space:

1
2
def replace(r: scala.util.matching.Regex, s: String) = { (orig:String) => r.replaceAllIn(orig, s) }
val collapseSpaces = replace(new scala.util.matching.Regex("[\\s]+"), " ")

The next thing we’ll want to do is eliminate all punctuation from the ends of each word. An appropriate definition of “punctuation” will depend on the sorts of tokens we wind up deciding are interesting, but I considered punctuation characters to be anything except:

  1. alphanumeric characters,
  2. dashes, and
  3. underscores.

Whether or not we want to retain intratoken punctuation depends on the application; there are good arguments to be made for retaining colons and periods (MAC addresses, programming-language identifiers in stack traces, hostnames, etc.), slashes (paths), at-signs (email addresses), and other marks as well. I’ll be retaining these marks but stripping all others. After these transformations, we can split on whitespace and get a relatively sensible set of tokens.

Here’s Scala code to strip punctuation from lines:

1
2
3
4
5
6
7
8
val rejectedIntratokenPunctuation = new scala.util.matching.Regex("[^A-Za-z0-9-_./:@]")
val leadingPunctuation = new scala.util.matching.Regex("(\\s)[^\\sA-Za-z0-9-_/]+|()^[^\\sA-Za-z0-9-_/]+")
val trailingPunctuation = new scala.util.matching.Regex("[^\\sA-Za-z0-9-_/]+(\\s)|()[^\\sA-Za-z0-9-_/]+$")

val stripPunctuation: String => String =
  replace(leadingPunctuation, "$1") compose
  replace(trailingPunctuation, "$1") compose
  replace(rejectedIntratokenPunctuation, "")

In order to filter out strings of numbers, we’ll reject all tokens that don’t contain at least one letter. (We could be stricter and reject all tokens that don’t contain at least one letter that isn’t a hex digit, but I decided to be permissive in order to avoid rejecting interesting words that only contain letters A-F.)

1
val oneletter = new scala.util.matching.Regex(".*([A-Za-z]).*")

Here’s what our preprocessing pipeline looks like, assuming an RDD of log messages called messages:

1
2
3
4
5
6
7
def tokens(s: String, post: String=>String = identity[String]): Seq[String] =
  collapseWhitespace(s)
    .split(" ")
    .map(s => post(stripPunctuation(s)))
    .collect { case token @ oneletter(_) => token }

val tokenSeqs = messages.map(line => tokens(line))

Now we have a sequence of words for each log message and are ready to train a word2vec model.

1
2
3
4
import org.apache.spark.mllib.feature.Word2Vec
val w2v = new Word2Vec

val model = w2v.fit(tokenSeqs)

Note that there are a few things we could be doing in our preprocessing pipeline but aren’t, like using a whitelist (for dictionary words or service names), or rejecting stopwords. This approach is pretty basic, but it produces some interesting results in any case.

Results and conclusions

I evaluated the model by using it to find synonyms for (more or less) arbitrary words that appeared in log messages. Recall that word2vec basically models words by the contexts in which they might appear; informally, synonyms are thus words with similar contexts.

  • The top synonyms for nova (the OpenStack compute service) included vm, glance, containers, instances, and images — all of these are related to running OpenStack compute jobs.
  • The top synonyms for volume included update, cinder.scheduler.host_manager, and several UUIDs for actual volumes.
  • The top synonyms for tmpfs included type, dev, uses, initialized, and transition.
  • The top synonyms for sh included /usr/bin/bash, _AUDIT_SESSION, NetworkManager, _SYSTEMD_SESSION, postfixqmgr.
  • The top synonyms for password included publickey, Accepted, opened, IPMI, and the name of an internal project.

These results aren’t earth-shattering — indeed, they won’t even tell you where to get a decent burrito — but they’re interesting, they’re sensible, and they point to the effectiveness of word2vec even given a limited, unidiomatic corpus full of odd word-like tokens. Of course, one can imagine ways to make our preprocessing more robust. Similarly, there are certainly other ways to generate a training corpus for these words: perhaps using the set of all messages for a particular service and severity as a training sentence, using the documentation for the services involved, using format strings present in the source or binaries for the services themselves, or some combination of these.

Semantic modeling of terms in log messages is obviously useful for log analytics: it can be used as part of a pipeline to classify related log messages by topic, in feature engineering for anomaly detection, and to suggest alternate search terms for interactive queries. However, it is a pleasant surprise that we can train a competent word2vec model for understanding log messages from the uncharacteristic utterances that comprise log messages themselves.


  1. Spark does provide a stopword filter for English and there are external libraries to fill in some of its language-processing gaps. In particular, I’ve had good luck with the Porter stemmer implementation from Chalk.

Consider the following hypothetical conference session abstract:

Much like major oral surgery, writing talk abstracts is universally acknowledged as difficult and painful. This has never been more true than it is now, in our age of ubiquitous containerization technology. Today’s aggressively overprovisioned, multi-track conferences provide high-throughput information transfer in minimal venue space, but do so at a cost: namely, they impose stingy abstract word limits. The increasing prevalence of novel “lightning talk” tracks presents new challenges for aspiring presenters. Indeed, the time it takes to read a lightning talk abstract may be a substantial fraction of the time it takes to deliver the talk! The confluence of these factors, inter alia, presents an increasingly hostile environment for conference talk submissions in late 2015. Your talk proposals must adapt to this changing landscape or face rejection. Is there a solution?

Hopefully, you recognize some key elements of subpar abstracts that you’ve seen, reviewed, or — maybe, alas — even submitted in this example.

To identify what’s fundamentally wrong with it, we should first consider what the primary rhetorical aims for an abstract are. In particular, an abstract needs to

  1. provide context so that a general audience can understand that the problem the talk addresses is interesting,
  2. summarize the content of a talk so that audiences and reviewers know what to expect, and
  3. motivate conference attendees to put the talk on their schedule (and, more immediately, motivate the program committee to accept the talk).

The abstract above does none of these things, for both stylistic and structural reasons.

The example abstract’s prose is generally clunky, but the main stylistic problem is its overuse of jargon and enthymemes. If you don’t already spend time in the same neighborhoods of the practice as the author, you probably don’t understand all of these terms to mean the same things that the author does or agree with his or her sense of what is “universally acknowledged.” It is easy to fall in to using jargon when you’re deep in a particular problem domain: after all, most of the people you interact with use these words and you all seem to understand each other. However, jargon terms are essentially content-free: they convey nothing new to specialists and are completely opaque to novices. By propping up your writing on these empty terms instead of explaining yourself, you are excluding the cohort of your audience who doesn’t already understand your problem and shamelessly pandering to the cohort that does.1

The main structural problem with the example abstract is that it doesn’t actually make an argument for why the talk is interesting or worth attending; instead, it focuses on emphasizing the problems faced by abstract writers and ends with a cliffhanger. (The cliffhanger strategy not only adds no additional content, it is also especially risky.) A surprising number of abstracts, even accepted ones, suffer because they focus on only one or two of an abstract’s responsibilities, but it is possible to set your abstract up for success by starting from a structure that is designed to cover all of the abstract’s responsibilities.

In 1993, Kent Beck appeared on a panel on how to get a paper accepted at OOPSLA. OOPSLA (now called SPLASH) was an academic conference on research and development related to object-oriented programming languages, systems and environments to support object-oriented programming, and applications developed using these technologies. This is a particularly broad mandate, and because OOPSLA attracted so many papers on a wide range of topics, it had an extremely low acceptance rate. (This is probably why they held a panel on getting papers accepted, but it also makes OOPSLA a good analogy for contemporary practice-focused technical conferences that often cross several areas of specialization, e.g., data processing, distributed computing, and machine learning.)

Beck’s advice is worth reading even if you aren’t writing an academic conference paper. In particular, he suggests that you start by identifying a single “startling sentence” that summarizes your work and can grab the attention of the program committee. From there, Beck advises that you adopt the following four-sentence model to structure your abstract:

  1. The first sentence is the problem you’re trying to solve,
  2. The second sentence provides context for the problem or explains its impact,
  3. The third sentence is the “startling sentence” that is the key insight or contribution of your work, and
  4. The fourth sentence shows how the key contribution of your work affects the problem.

I’ve used this template in almost every abstract I’ve written for many years, although I sometimes devote more than a single sentence to each step. It has successfully helped me refine abstracts for both industry conference talks and academic papers, and it more or less ensures that each abstract accomplishes what it needs to. (If you’re writing a talk abstract, as opposed to a paper abstract, it’s sometimes also a good idea to add a sentence or two covering what the audience should expect to take away from your talk and why you’re qualified to give it.) If I am sure to consider my audience — first, an overworked program committee member, and second, a jetlagged and overstimulated conference attendee — I am far more likely to explain things clearly and eschew jargon. As a bonus, starting from a fairly rigid structure frees me from wasting time worrying about how best to arrange my prose.

If we avoid jargon and start from Beck’s structure, we can transform the mediocre example abstract from the beginning of this post into something far more effective:

Contemporary multiple-track industry conferences attract speakers and attendees who specialize in distinct but related parts of the practice. Since many authors adopt ineffective patterns from other technical abstracts they’ve read, they may unwittingly submit talk proposals that are at best rhetorically impotent and at worst nonsensical to people who don’t share their specialization. By starting from a simple template, prospective speakers can dramatically improve their chances of being understood, accepted, and attended, while also streamlining the abstract-writing process. Excellent abstracts benefit the entire community, because more people will be motivated to learn about interesting work that is outside of their immediate area of expertise. In this talk, delivered by someone who has delivered many talks without any serious train wrecks and has also helped other people get talks accepted, you’ll learn a straightforward technique for designing abstracts that communicate effectively to a general audience, sell your talk to the program committee, and motivate your peers to attend your talk.

Delivering a technical talk has a lot in common with running a half-marathon or biking a 40k time trial. You’re excited and maybe a little nervous, you’re prepared to go relatively hard for a relatively long time, and you’re acutely aware of the clock. In both situations, you might be tempted to take off right from the gun, diving into your hardest effort (or most technical material), but this is a bad strategy.

By going out too hard in the half-marathon, you’ll be running on adrenaline instead of on your aerobic metabolism, will burn matches by working hard before warming up fully, and ultimately won’t be able to maintain your best possible pace because you’ll be spent by the second half of the race. Similarly, in the talk, your impulse might be to get right to the most elegant and intricate parts of your work immediately after introducing yourself, but if you get there without warming up the audience first, you’ll lose most of them along the way. In both cases, your perception of what you’re doing is warped by energy and nerves; the right pace will feel sluggish and awkward; and starting too fast will put you in a hole that will be nearly impossible to recover from.

Delivering a technical talk successfully has a lot in common with choosing an appropriate pacing strategy for an endurance event: by starting out slower than you think you need to, you’ll be able to go faster at the end. Most runners1 will be able to maintain a higher average pace by doing negative splits. In a race, this means you start out slower than your desired average pace and gradually ramp up over the course of the race so that by the end, you’re going faster than your desired average pace. By starting out easy, your cardiovascular system will warm up, your connective tissue will get used to the stress of pounding on the pavement, and your muscles will start buffering lactic acid; this will reduce muscle fatigue and save your anaerobic energy for the final sprint.

You can apply the general strategy of negative splits to a talk as well. Instead of warming up cold muscles and your aerobic energy systems before making them work, you’re preparing a group of smart people to learn why they should care about your topic before making them think about it too much. Start off slow: provide background, context, and examples. Unless you’re a very experienced speaker, this will feel agonizingly slow at first.

It’s understandable that it might feel remedial and boring to you to explain why your work is relevant. After all, you’re deep in your topic and have probably long since forgotten what it was like to learn about it for the first time. Examples and visual explanations might seem like a waste of time before you get to your clever implementation, elegant proof, or sophisticated model. You have some serious detail to cover, after all! Your audience, however, isn’t prepared for that detail yet. If you skip the warm-up and go straight to that detail, you’ll lose audience engagement, and it’s nearly impossible to recover from that; it’ll certainly prevent you from covering as much as you might have otherwise wanted to.

Remember that your audience is made up of smart people who chose to attend your talk instead of sitting out in the hall. They’d probably rather be learning something from you than halfheartedly reading email. But they also almost certainly don’t know as much about your topic as you do. Ease them in to it, warm them up, and give them plenty of context first. You’ll be able to cover more ground that way.


  1. Pacing in cycling time trials can be a little more complicated depending on the terrain and wind but in general being able to finish stronger than you started is still desirable.

Brandenburger Tor lightshow

I was in Berlin last week for Flink Forward, the inaugural Apache Flink conference. I’m still learning about Flink, and Flink Forward was a great place to learn more. In this post, I’ll share some of what I consider its coolest features and highlight some of the talks I especially enjoyed. Videos of the talks should all be online soon, so you’ll be able to check them out as well.

Background

Apache Flink is a data processing framework for the JVM that is most popular for streaming workloads with high throughput and low latency, although it is also general enough to support batch processing. Flink has a pleasant collection-style API, offers stateful elementwise transformations on streams (think of a fold function), can be configured to support fault-tolerance with exactly-once delivery, and does all of this while achieving extremely high performance. Flink is especially attractive for use in contemporary multitenant environments because it manages its own memory and thus Flink jobs can run well in containers on overprovisioned systems (where CPU cycles may be relatively abundant but memory may be strictly constrained).

Keynotes and lightning talks

Kostas Tzoumas and Stephan Ewan (both of data Artisans) shared a keynote in which they presented the advancements in Flink 0.10 (to be released soon) and shared the roadmap for the next release, which will be Flink 1.0. The most interesting parts of this keynote for me were the philosophical arguments for the generality and importance of stream processing in contemporary event-driven data applications. Many users of batch-processing systems simulate streaming workflows by explicitly encoding windows in the structure of their input data (e.g., by using one physical file or directory to correspond to a day, month, or year worth of records) or by using various workarounds inspired by technical limitations (e.g., the “lambda architecture” or bespoke but narrowly-applicable stream processors). However, mature stream processing frameworks not only enable a wide range of applications that process live events, but they also are general enough to handle batch workloads as a special case (i.e., by processing a stream with only one window).1

Of course, the workarounds that data engineers have had to adopt to handle streaming data in batch systems are only necessary given an absence of mature stream processing frameworks. The streaming space has improved a great deal recently, and this talk gave a clear argument that Flink was mature enough for demanding and complex applications. Flink offers a flexible treatment of time: events can be processed immediately (one at a time), in windows based on when the events arrived at the processor, or in windows based on when the events were actually generated (even if they arrived out of order). Flink supports failure recovery with exactly-once delivery but also offers extremely high throughput and low latency: a basic Flink stream processing application offers two orders of magnitude more throughput than an equivalent Storm application. Flink also provides a batch-oriented API with a collection-style interface and an optimizing query planner.

After the keynote, there were several lightning talks. Lightning talks at many events are self-contained (and often speculative, provocative, or describing promising work in progress). However, these lightning talks were abbreviated versions of talks on the regular program. In essence, they were ads for talks to see later (think of how academic CS conference talks are really ads for papers to read later). This was a cool idea and definitely helped me navigate a two-track program that was full of interesting abstracts.

Everyday Flink

Michael Häusler of ResearchGate gave a talk in which he talked about the process of evaluating new data processing frameworks, focusing in particular on determining whether a framework makes simple tasks simple. (Another step, following Alan Kay’s famous formulation, is determining whether or not a framework makes complex tasks possible.) The “simple task” that Häusler set out to solve was finding the top 5 coauthors for every author in a database of publications; he implemented this task in Hive (running on Tez), Hadoop MapReduce, and Flink. Careful readers will note that this is not really a fair fight: SQL and HiveQL do not admit straightforward implementations of top-k queries and MapReduce applications are not known for elegant and terse codebases; indeed, Häusler acknowledged as much. However, it was still impressive to see how little code was necessary to solve this problem with Flink, especially when contrasted with the boilerplate of MapReduce or all of the machinery to implement a user-defined aggregate function to support top-k in Hive. The Flink solution was also twice as fast as the custom MapReduce implementation, which was in turn faster than Hive on Tez.

Declarative Machine Learning with the Samsara DSL

Sebastian Schelter introduced Samsara, a DSL for machine learning and linear algebra. Samsara supports in-memory vectors (both dense and sparse), in-memory matrices, and distributed row matrices, and provides an R-like syntax embedded in Scala for operations. The distributed row matrices are a unique feature of Samsara; they support only a subset of matrix operations (i.e., ones that admit efficient distributed implementations) and go through a process of algebraic optimization (including generating logical and physical plans) to minimize communication during execution. Samsara can target Flink, Spark, and H2O.

Streaming and parallel decision trees in Flink

Training decision trees in batch frameworks requires a view of the entire learning set (and sufficient training data to generate a useful tree). In streaming applications, each event is seen only once, the classifier must be available immediately (even if there is little data to train on) and the classifier should take feedback into account in real time. In this talk, Anwar Rizal of Amadeus presented a technique for training decision trees on streaming data by building and aggregating approximate histograms for incoming features and labels.

Juggling with bits and bytes — how Apache Flink operates on binary data

Applications using the Java heap often exhibit appalling memory efficiency; the heap footprint of Java library data structures can be 75% overhead or more. Since data processing applications frequently create, manipulate, and serialize many objects — some of which may be quite short-lived — there are potentially significant performance pitfalls to using the JVM directly for memory allocation. In this talk, Fabian Hueske of data Artisans presented Flink’s approach: combining a custom memory-management and serialization stack with algorithms that operate directly on compressed data. Flink jobs are thus more memory-efficient than programs that use the Java heap directly, exhibit predictable space usage, and handle running out of memory gracefully by spilling intermediate results to disk. In addition, Flink’s use of database-style algorithms to sort, filter, and join compressed data reduces computation and communication costs.

Stateful Stream Processing

Data processing frameworks like Flink and Spark support collection-style APIs where distributed collections or streams can be processed with operations like map, filter, and so on. In addition to these, it is useful to support transformations that include state, analogously to the fold function on local collections. Of course, fold by itself is fairly straightforward, but a reliable fold-style operation that can recover in the face of worker failures is more interesting. In this talk, Márton Balassi and Gábor Hermann presented an overview of several different approaches to supporting reliable stream processing with state: the approaches used by Flink (both versions 0.9.1 and 0.10), Spark Streaming, Samza, and Trident. As one might imagine, Spark Streaming and Samza get a lot of mileage out of delegating to underlying models (immutable RDDs in Spark’s case and a reliable unified log in Samza’s). Flink’s approach of using distributed snapshots exhibits good performance and enables exactly-once semantics, but it also seems simpler to use than alternatives. This has become a recurring theme in my investigation of Flink: technical decisions that are advertised as improving performance (latency, throughput, etc.) also, by happy coincidence, admit a more elegant programming model.

Fault-tolerance and job recovery in Apache Flink

This talk was an apt chaser for the Stateful Stream Processing talk. Till Rohrmann presented Flink’s approaches to checkpointing and recovery, showing how Flink can be configured to support at-most-once delivery (the weakest guarantee), at-least-once delivery, or exactly-once delivery (the strongest guarantee). The basic approach Flink uses for checkpointing operator state is the Chandy-Lamport snapshot algorithm, which enables consistent distributed snapshots in a way that is transparent to the application programmer. This approach also enables configurable tradeoffs between throughput and snapshot interval, but it’s far faster (and nicer to use) than Storm’s approach in any case. Recovering operator state is only part of the fault-tolerance picture, though; Till’s talk also introduced Flink’s approach for supporting a highly-available Job Manager.

Other talks worth checking out

Here are a few talks that I’d like to briefly call out as worth watching:

General notes

Kulturbrauerei

The data Artisans team and the Flink community clearly put a lot of hard work towards making this a really successful conference. The venue (pictured above) was unique and cool, the overall vibe was friendly and technical, and I didn’t see a single talk that I regretted attending. (This is high praise indeed for a technical conference; I may have been lucky, but I suspect it’s more likely that the committee picked a good program.) I especially appreciated the depth of technical detail in the talks by Flink contributors on the second afternoon, covering both design tradeoffs and implementation decisions. I’m hoping to be back for a future iteration.


  1. Indeed, as we saw later in the conference, the difference between “batch” workloads and “streaming” workloads can be as simple as a set of policy decisions by the scheduler.

I’m speaking at Spark Summit today about using Spark to analyze operational data from the Fedora project. Here are some links to further resources related to my talk:

You should also check out my team’s Silex library, which contains useful code factored out of real Spark applications we’ve built in Red Hat’s Emerging Technology group. It includes a lot of cool functionality, but the part I mentioned in the talk is this handy interface for preprocessing JSON data before inferring a schema.

In this post, we’ll see how to make a simple transformer for Spark ML Pipelines. The transformer we’ll design will generate a sparse binary feature vector from an array-valued field representing a set.

Preliminaries

The first thing we’ll need to do is expose Spark’s user-defined type for vectors. This will enable us to write a user-defined data frame function that returns a Spark vector. (We could also implement our own user-defined type, but reusing Spark’s, which is currently private to Spark, will save us some time. By the time you read this, the type may be part of Spark’s public API — be sure to double-check!)

1
2
3
4
package org.apache.spark.hacks {
  // make VectorUDT available outside of Spark code
  type VectorType = org.apache.spark.mllib.linalg.VectorUDT
}

Imports

Here are the imports we’ll need for the transformer and support code. I’ll use VEC for Spark vectors to avoid confusion with Scala’s Vector type. We’ll assume that the VectorType code from above is available on your project’s classpath.

1
2
3
4
5
6
7
8
9
10
11
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

import org.apache.spark.ml.param._
import org.apache.spark.ml.Transformer

import org.apache.spark.mllib.linalg.{Vector => VEC, Vectors}
import org.apache.spark.sql.types._

import org.apache.spark.hacks.VectorType

Transformer and support code

Most of the ML pipeline classes distributed with Spark follow the convention of putting groups of related pipeline stage parameters in a trait. We’ll do this as well, declaring a trait for the three parameters that our transformer will use: the name of the input column, the name of the output column, and the maximum number of elements our sparse vector can hold. We’ll also define a convenience method to return a triple of the parameter values we care about.

1
2
3
4
5
6
7
8
9
10
11
12
trait SVParams extends Params {
  val inputCol = new Param[String](this, "inputCol", "input column")
  val outputCol = new Param[String](this, "outputCol", "output column")
  val vecSize = new IntParam(this, "vecSize", "maximum sparse vector size")

  def pvals(pm: ParamMap) = (
    // this code can be cleaner in versions of Spark after 1.3
    paramMap.get(inputCol).getOrElse("topicSet"),
    paramMap.get(outputCol).getOrElse("features"),
    paramMap.get(vecSize).getOrElse(128)
  )
}

Note that Spark 1.4 supports calling getOrElse directly on a ParamMap instance, so you can slightly simplify the code in pvals if you don’t care about source compatibility with Spark 1.3.

Here’s what the actual transformer implementation looks like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class SetVectorizer(override val uid: String)
    extends Transformer with SVParams {
  val VT = new org.apache.spark.hacks.VectorType()

  def transformSchema(schema: StructType, params: ParamMap) = {
    val outc = paramMap.get(outputCol).getOrElse("features")
    StructType(schema.fields ++ Seq(StructField(outc, VT, true)))
  }

  def transform(df: DataFrame, params: ParamMap) = {
    val (inCol, outCol, maxSize) = pvals(paramMap)
    df.withColumn(outCol, callUDF({ a: Seq[Int] =>
      Vectors.sparse(maxSize, a.toArray, Array.fill(a.size)(1.0))
    }, VT, df(inCol)))
  }
}

The first thing we do in the transformer class is declare an instance of VectorType to use in other data frame type declarations later in the class. The transformSchema method returns the schema after applying this transformer to a given data frame; it creates a new data frame schema that includes all of the fields from the original frame as well as a Vector-valued field whose name is the parameter specified in the outputCol parameter. Finally, the transform method creates a new data frame with an additional column (again, named with the value of the outputCol parameter); its values result of applying a user-defined function to each row in the data frame, taking arguments from the input column. The function itself simply creates a sparse binary vector from an array-backed set, so that the array-backed set Array(1,2,4,8) would become a sparse vector with the first, second, fourth, and eighth elements set to 1 and everything else set to 0.

The code above is a reasonable starting point for your own transformers, but you’ll want to add error checking to code you use in production: at a minimum, you’d need to validate the schema of the input data frame (to ensure that expected columns exist and are of the correct type), verify that the output column name doesn’t already exist in the data frame, and make sure no input array has more than vecSize elements. I hope this code is helpful as you develop your own pipeline stages!

This post will show you an extremely simple way to make quick-and-dirty Bokeh plots from data you’ve generated in Spark, but the basic technique is generally applicable to any data that you’re generating in some application that doesn’t necessarily link in the Bokeh libraries.

Getting started

We’ll need to have a recent version of Bokeh installed and some place to put our data. If you don’t already have Bokeh installed, use virtualenv to get it set up:

1
2
3
virtualenv ~/.bokeh-venv
source ~/.bokeh-venv/bin/activate
pip install bokeh numpy pandas

This will download and build a nontrivial percentage of the Internet. Consider getting some coffee or catching up with a colleague while it does its thing.

The next thing we’ll need is some place to stash data we’ve generated. I’ll use a very simple service that I developed just for this kind of application, but you can use whatever you want as long as it will let you store and retrieve JSON data from a given URL.

With all that in place, we’re ready to make a simple plot.

A basic static-HTML plot

We’re going to use Bokeh to construct a basic line graph in a static HTML file. The catch is that instead of specifying the data statically, we’re going to specify it as a URL, which the page will poll so it can update the plot when it changes. Here’s what a script to construct that kind of plot looks like:

1
2
3
4
5
6
7
8
9
#!/usr/bin/env python

from bokeh.plotting import figure, output_file, show
from bokeh.models.sources import AjaxDataSource
output_file("json.html", title="data polling example")
source = AjaxDataSource(data_url="http://localhost:4091/tag/foo", polling_interval=1000)
p = figure()
p.line('x', 'y', source=source)
show(p)

Note that the above example assumes you’re using the Firkin service as your data store. If you’re using something else, you will probably need to change the data_url parameter to the AjaxDataSource constructor.

Publishing and updating data

Now we’ll fire up the Firkin server and post some data. From a checkout of the Firkin code, run sbt server/console and then create a connection to the server using the client library:

1
val client = new com.freevariable.firkin.Client("localhost", 4091)

You can now supply some values for x and y. We’ll start simple:

1
2
/* publish an object to http://localhost:4091/tag/foo */
client.publish("foo", """{"x": [1,2,3,4,5,6,7,8,9,10], "y": [2,4,6,8,10,12,14,16,18,20]}""")

If you don’t already have json.html from the previous step loaded in your browser, fire it up. You should see a plot that looks like this:

plot of y=2x for 1..10

If we update the data stored at this URL, the plot will update automatically. Try it out; publish a new data set:

1
client.publish("foo", """{"x": [1,2,3,4,5,6,7,8,9,10], "y": [2,4,6,8,10,12,14,16,18,30]}""")

The plot will refresh, reflecting the updated y value:

plot of y=2x for 1..9 and y=3x for x=10

(Again, if you’re using some other JSON object cache to store your data sets, the principles will be the same but the syntax won’t be.)

Connecting Bokeh to Spark

Once we have a sink for JSON data, it’s straightforward to publish data to it from Spark code. Here’s a simple example, using json4s to serialize the RDDs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

// assume that spark is a SparkContext object, declared elsewhere
val x = spark.parallelize(1 to 100)
// x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>

val y = x.map(_ / 4)
y: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>

val json = ("x" -> x.collect.toList) ~ ("y" -> y.collect.toList)

val client = new com.freevariable.firkin.Client("localhost", 4091)

client.publish("data", compact(render(json)))

After we do this, the data will be available at http://localhost:4091/tag/data.

Alternate approaches

The approach described so far in this post was designed to provide the simplest possible way to make a basic plot backed by a relatively small amount of dynamic data. (If we have data to plot that we can’t store in memory on a single node, we’ll need something more sophisticated.) It is suitable for prototyping or internal use. Other approaches, like the two below, might make more sense for other situations:

  1. Bokeh offers several alternate language bindings, including one for Scala. This means that you can define plots and supply data for them from within applications that link these libraries in. This is the approach that spark-notebook takes, for example.
  2. Another possibility is setting up the Bokeh server and publishing plots and data to it. The Bokeh server offers several additional capabilities, including support for scalable visualizations (e.g., via sampling large datasets) and support for multiple authenticated users.

Most people set personal and professional goals. If you work in software, your near-term professional goals might sound like this:

  • Ship the next version of our team’s product without any undiscovered crashing bugs
  • Improve the performance of the caching layer by 10%
  • Become a committer on some particular open-source project

Those probably sound pretty reasonable, right? They aren’t vague wishes, they’re goals. They probably aren’t totally unrealistic, and you’ll know you’ve achieved them when some objective, measurable thing happens. (They’re SMART, in other words.)

We’ll come back to these goals. First, though, I’d like digress and talk about bike racing, and specifically about the kinds of goals an amateur racer might set for a year. Maybe those would look like this:

  • Stay healthy and avoid injuries all season
  • Improve my one-minute power output by 0.5 W/kg
  • Upgrade my racing category

These are more or less analogous to the professional goals. They’re objective, measurable, and reasonable. However, they’re also at least partially out of your control. You can’t totally prevent yourself from getting sick or crashing. Your one-minute power might be right at the limits of your genetic capabilities (or maybe your season shapes up so that it makes more sense to focus on improving other aspects of your fitness). And the category upgrade is up to an official in your local cycling association — you can get an automatic upgrade by winning races, but whether or not you win is also largely out of your control.

I first read about the idea of process and outcome goals in Joe Friel’s The Cyclist’s Training Bible, but I’ve found them extremely helpful in talking about professional goals as well. The idea is that there are two kinds of goals: outcome goals, which are results, and process goals, which relate to how you prepare and execute. Of course, you’re ultimately interested in outcomes (winning races, shipping great products, becoming recognized as an authority in your field), but outcomes are always to some extent out of your control.

Instead of focusing on outcomes for goals, it makes more sense to focus on what you can control: the processes that support those outcomes. So, in the bike case, you could recast the outcome goals as:

  • Make sure I get at least 7 hours of sleep a night and avoid sketchy racers in corners or sprints
  • Do plyometric exercises at least twice a week in the offseason
  • Identify key races that I can do well in and make sure to show up for them well-rested and with adequate nutrition; then focus on avoiding particular tactical mistakes I’ve made in past races

The software goals I mentioned at the beginning of the article are also outcome goals. You can’t guarantee that your product won’t have any undiscovered crashing bugs, but you can sure put processes in place so that you’re more likely to discover crashing bugs before you ship. Similarly, a 10% performance improvement sounds great, but it might require unjustifiable effort or be unachievable (conversely, it might turn out that 10% improvement is insufficient to improve the behavior of the whole system)! Community leadership positions always involve political as well as meritocratic considerations; while you can make a case for yourself as a good candidate, you can’t force people to vote for you.

Recast as process goals, the software goals might look like this:

  • Ensure that no new commit to our product introduces untested code, use property-based tests for all internal data structures, and incorporate fuzzing of every public interface into the QA process.
  • Profile the caching layer under a range of conditions and document the hot spots, identifying algorithmic and implementation-level avenues for performance improvement. Add performance testing to CI.
  • Participate in the project community, answering questions on the mailing list every week and submitting at least one nontrivial patch a week.

It’s easy to imagine the rest of your career and think about outcomes you want to see: earning a prestigious position, achieving acknowledged impact in your department or company, or writing a popular framework (or book).1 But those goals are often pretty far off from what you need to do to get there. Setting process goals supports the outcomes that you want to achieve. But the really remarkable thing is that they do so much more than that. By focusing on improving your preparation and execution, you’re making yourself a better engineer (or bike racer) even if you don’t get the outcomes you want right away — and making it more likely you’ll see great outcomes in the future.


  1. Outcome goals are a particularly dangerous trap if you’re coming from an environment like academia or open-source development, where outcome-related metrics like tenure, citation count, and number of GitHub “stars” are never far from your evaluation of your own career. (The frictionless recognitions afforded by social media “likes” and “favorites” bring this headache to a much broader audience.)

Elasticsearch has offered Hadoop InputFormat and OutputFormat implementations for quite some time. These made it possible to process Elasticsearch indices with Spark just as you would any other Hadoop data source. Here’s an example of this in action, taken from Elastic’s documentation:

Elasticsearch in Spark via Hadooplink
1
2
3
4
5
6
7
val conf = new JobConf()
conf.set("es.resource", "radio/artists")
conf.set("es.query", "?q=me*")
val esRDD = sc.hadoopRDD(conf,
                classOf[EsInputFormat[Text, MapWritable]],
                classOf[Text], classOf[MapWritable]))
val docCount = esRDD.count();

However, the latest versions of the Elasticsearch client offers more idiomatic Spark support to generate RDDs (and Spark data frames) directly from Elasticsearch indices without explicitly dealing with the Hadoop formatting machinery. This support is extremely cool, but the documentation is still a bit sparse and I hit a few snags using it. In this post, I’ll cover the problems I encountered and explain how to make it work.

Dependencies

Native Spark support is available in elasticsearch-hadoop version 2.1.0. If you’d tried to set this up last week, you’d have needed to run against a snapshot build to get support for Spark data frames. Fortunately, the most recent beta release of elasticsearch-hadoop includes this support. Add the following library dependency to your project:

sbt dependency configuration
1
libraryDependencies += "org.elasticsearch" %% "elasticsearch-spark" % "2.1.0.Beta4"

Since we won’t be using the lower-level Hadoop support, that’s the only library we’ll need.

Configuration

We need to supply some configuration in order to create RDDs directly from indices. elasticsearch-spark expects this configuration to be available as parameters on our Spark context object. Here’s an example of how you’d do that in a vanilla Spark application:

Configuring an ES node list
1
2
3
4
5
6
val conf = new org.apache.spark.SparkConf()
 .setMaster("local[*]")
 .setAppName("es-example")
 .set("es.nodes", "localhost")

val spark = new org.apache.spark.SparkContext(conf)

Note that since the default value for es.nodes is localhost, you don’t need to set it at all if you’re running against a local Elasticsearch instance. If you were running Elasticsearch behind a reverse proxy running on proxyhost on port 80, you might specify proxyhost:80 instead (and you would probably also want to set es.nodes.discovery to false so your client wouldn’t discover nodes that weren’t reachable outside of the private network). There are other options you can set, but this is the most critical.

If you’re using the Spark app skeleton provided by the Silex library, you can add Elasticsearch configuration in config hooks, as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import com.redhat.et.silex.app.AppCommon
import org.elasticsearch.spark._

object IndexCount extends AppCommon {
  addConfig { conf =>
    conf
     .set("es.nodes", sys.env.getOrElse("ES_NODES", "localhost"))
     .set("es.nodes.discovery", "false")
  }

  override def appName = "example ES app"

  def appMain(args: Array[String]) {
    args match {
      case Array(index, indexType) =>
        val resource = s"$index/$indexType"
        val count = context.esRDD(resource).count()
        Console.println(s"$resource has $count documents")
      case _ =>
        Console.println("usage: IndexCount index type")
    }
  }
}

Generating RDDs

elasticsearch-spark offers implicit conversions to make RDDs directly from Elasticsearch resources. (Careful readers may have already seen one in action!) The following code example assumes that you’ve initialized spark as a SparkContext with whatever Elasticsearch configuration data you need to set:

Generating RDDs from ES resources
1
2
3
4
5
6
7
8
9
10
// bring implicit conversions into scope to extend our
// SparkContext with the esRDD method, which creates a
// new RDD backed by an ES index or query
import org.elasticsearch.spark._

// telemetry-20080915/sar is an ES index of system telemetry data
// (collected from the sar tool)
val sarRDD = spark.esRDD("telemetry-20080915/sar")

// => sarRDD: org.apache.spark.rdd.RDD[(String, scala.collection.Map[String,AnyRef])] = ScalaEsRDD[4] at RDD at AbstractEsRDD.scala:17

This is far more convenient than using the Hadoop InputFormat, but the interface still leaves a little to be desired. Our data may have a sensible schema, but the result we’re getting back is as general as possible: just a collection of key-value pairs, where the keys are strings and the values are maps from strings to arbitrary (and arbitrarily-typed) values. If we want more explicit structure in our data, we’ll need to bring it in to Spark in a different way.

Generating data frames

In order to benefit from the structure in our data, we can generate data frames that store schema information. The most recent release of elasticsearch-spark supports doing this directly. Here’s what that looks like, again assuming that spark is an appropriately-configured SparkContext:

Generating data frames from ES resources
1
2
3
4
5
6
7
8
9
10
11
12
13
// bring implicit conversions into scope to extend our
// SQLContexts with the esDF method, which creates a
// new data frame backed by an ES index or query
import org.elasticsearch.spark.sql._

// construct a SQLContext
val sqlc = new org.apache.spark.sql.SQLContext(spark)

// telemetry-20080915/sar is an ES index of system telemetry data
// (collected from the sar tool)
val sarDF = sqlc.esDF("telemetry-20080915/sar")

// => org.apache.spark.sql.DataFrame = [_metadata: struct<file-date:timestamp,generated-by:string,generated-by-version:string,machine:string,nodename:string,number-of-cpus:int,release:string,sysdata-version:float,sysname:string>, cpu-load: string, cpu-load-all: string, disk: string, filesystems: string, hugepages: struct<hugfree:bigint,hugused:bigint,hugused-percent:double>, interrupts: string, interrupts-processor: string, io: struct<io-reads:struct<bread:double,rtps:double>,io-writes:struct<bwrtn:double,wtps:double>,tps:double>, kernel: struct<dentunusd:bigint,file-nr:bigint,inode-nr:bigint,pty-nr:bigint>, memory: struct<active:bigint,buffers:bigint,bufpg:double,cached:bigint,campg:double,commit:bigint,commit-percent:double,dirty:bigint,frmpg:double,inactive:bigint,memfree:bigint,memu...

Once we have a data frame for our ES resource, we can run queries against it in Spark:

1
2
3
sardf.registerTempTable("sardf")

sqlc.sql("select _metadata from sardf")

Next steps

Elasticsearch’s Spark support offers many additional capabilities, including storing Spark RDDs to Elasticsearch and generating RDDs from queries. Check out the upstream documentation for more!