One of my side projects this year has been using Apache Spark to make sense of my bike power meter data. There are a few well-understood approaches to bike power data modeling and analysis, but the domain has been underserved by traditional machine learning approaches, and I wanted to see if I could quickly develop some novel techniques. My experiments so far are available as open-source code and they have been quite successful in two ways: not only have I mined some useful information from my rides, but I’ve also demonstrated that Spark is responsive enough to be a good substitute for R in exploratory use cases.
However, the code I’ve produced up to this point has really been more focused on experimenting with analytic techniques rather than on scalable implementations. I’ve valued obvious correctness and simplicity over performance, in some cases making implementation decisions that were obviously suboptimal in the interest of having simple implementations. (The blessing and the curse of declarative programming is that you’re insulated, to some extent, from how your application actually executes!)
Since publishing the code, giving talks about it, and demoing it, I’ve decided that the basic approach is fairly sensible. I’ve also seen a lot of interest from teammates and other cyclists who’d like to analyze their own data, and I’d like to eventually make the code available as a public-facing service – which means that it’s time to adjust my tradeoffs to favor efficiency a bit more.
In this post, we’ll look at several minor changes (and one major change) that improved my code’s performance, identifying the common issues that each uncovered. This certainly won’t be an exhaustive catalog of Spark optimization tips, and I’m not claiming that the code I wound up with is perfect, but keeping these issues in mind should improve your Spark apps. We’ll cover using broadcast variables to distribute data to workers, using caching sensibly, being sensitive to data shuffles, and designing applications to take advantage of laziness.
Background
The particular code we’re looking at is from one of the the applications I discussed at Spark Summit this year.1 The question this app aims to answer is “given historical ride data, where am I likely to get my best mean n-second power?” Its basic approach is:
- load activity data from disk, transforming it into a collection of
Trackpoint
records, - optimize a k-means clustering (with a relatively large k) for the spatial locations of each
Trackpoint
, - find the mean power for each overlapping n-second window of each activity,
- for each pair (i, j) of spatial clusters, reject all but the best n-second efforts starting in cluster i and ending in cluster j, and
- finally, plot the trackpoints from the best remaining 20 efforts
The Trackpoint
structure is designed to capture all the metrics2 in each sample recorded in a Garmin TCX file; it is a Scala case class
with timestamp, coordinates, altitude, wattage, and activity identity. Once we’ve processed our input files into Trackpoint
records, we’ll group these records by activity, giving us a collection of pairs that encode a mapping from activity names to sequences of trackpoints, like this:
Since dealing with windowed data is common to several of my bike data applications, I had factored it out into a common trait:
trait ActivitySliding {
import org.apache.spark.rdd.RDD
import com.freevariable.surlaplaque.data.Trackpoint
def windowsForActivities[U](data: RDD[Trackpoint], period: Int, xform: (Trackpoint => U) = identity _) = {
val pairs = data.groupBy((tp:Trackpoint) => tp.activity.getOrElse("UNKNOWN"))
.flatMap({case (activity:String, stp:Seq[Trackpoint]) => (stp sliding period).zipWithIndex.map {case (s,i) => ((activity, i), s.map(xform))}})
pairs}
def identity(tp: Trackpoint) = tp
}
The windowsForActivities
function maps the data into overlapping period
-sample windows. Since it’s not possible, in general, to efficiently calculate sliding windows over RDDs3, we operate on materialized sequences of trackpoints, and return pairs representing a mapping from pairs of activity identifiers and sample offsets to sequences representing sample windows, like this:
The windowsForActivities
function also takes an optional transformation function argument (xform
), which can be used to map Trackpoint
records after splitting them into sliding windows. In my case, I use these to discard metrics data that isn’t relevant to a given application. Since our first approach to finding best efforts involves keeping every effort window around until we know which ones we’ll ultimately care about,4 going to a leaner trackpoint structure can save a lot of memory and improve performance. In the case of this application and my data, translating trackpoint records to simpler records (containing only latitude, longitude, and wattage) resulted in approximately a 2x speedup on my data.
In the rest of this post, we’ll look at the initial approach and some iterative refinements. For each refinement, we’ll look at the general Spark application performance issues it uncovered as well as the particular speedups I saw. (We’ll also look at a change that I thought would improve performance but didn’t.) First, though, a word of caution: I tried to do meaningful and repeatable timings, but was working on my personal workstation (hardly a controlled environment) didn’t do anything resembling a rigorous performance evaluation. Furthermore, your mileage may vary; be sure to thoroughly test improvements on your own applications and data!
The first pass
We’ll be looking at a small function in which PowerBestsApp
spends most of its time; this is the function that finds the best n-sample efforts after filtering out all but the best effort starting and ending in a given pair of clusters. Initially, it looks like this:
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: KMeansModel) = {
val windowedSamples = windowsForActivities(data, period, stripTrackpoints _).cache
val clusterPairs = windowedSamples
.map {case ((activity, offset), samples) => ((activity, offset), (closestCenter(samples.head.latlong, model), closestCenter(samples.last.latlong, model)))}
val mmps = windowedSamples.map {case ((activity, offset), samples) => ((activity, offset), samples.map(_.watts).reduce(_ + _) / samples.size)}
val top20 = mmps.join(clusterPairs)
.map {case ((activity, offset), (watts, (headCluster, tailCluster))) => ((headCluster, tailCluster), (watts, (activity, offset)))}
.reduceByKey ((a, b) => if (a._1 > b._1) a else b)
.map {case ((headCluster, tailCluster), (watts, (activity, offset))) => (watts, (activity, offset))}
.sortByKey(false)
.take(20)
.context.parallelize(top20, app.context.defaultParallelism * 4)
app.map {case (watts, (activity, offset)) => ((activity, offset), watts)}
.join (windowedSamples)
.map {case ((activity, offset), (watts, samples)) => (watts, samples)}
.collect
}
When we invoke bestsForPeriod
, we already have an RDD of trackpoint records corresponding to every sample in every activity under consideration. We have also optimized a set of k cluster centers (I typically use k=128 for my data) and have the resulting model available. In the body of this function, we:
- calculate and cache the overlapping sample windows for each activity, converting each
Trackpoint
record (with thestripTrackpoints
function, declared elsewhere) into a minimal record consisting only of latitude, longitude, and wattage (line 2), - we declare
clusterPairs
, a RDD of pairs in which window identifiers (that is, activity-offset pairs) are the keys and pairs of cluster centers (one for the starting cluster and one for the ending cluster) are the values (lines 3–4), - we declare
mmps
, an RDD of pairs in which the keys are window identifiers and the values are mean wattages for the corresponding window of samples (line 5), - we then find the top twenty efforts, only considering the best effort starting and ending in a given pair of clusters:
- first, we join
mmps
withclusterPairs
, giving us an RDD mapping from window identifiers to tuples of mean wattages and start-end cluster pairs (line 7), - we transform this mapping so that it’s keyed by the start-end clusters of each window (line 8),
- and reject all but the best effort for each start-end cluster pair (line 9),
- we then re-key the resulting pairs by mean wattage (line 10),
- and sort by mean wattage in descending (hence
false
) order (line 11) - now, up until this point, we haven’t actually done anything besides construct a directed acyclic graph of RDD dependencies, since we’ve exclusively invoked RDD transformations, which are lazy.5 However, in line 12, we’re going to force computation of the mean wattage-window identifier pairs and send the twenty best ones back to the driver program
- first, we join
- we convert the collection of pairs back to an RDD (line 14; note that we’re assuming low default parallelism or possible expansion to consider more than 20 best efforts), and immediately transpose each pair to re-key by window identifier (line 15),
- join this collection with the windows themselves (line 16), resulting in a map from window identifiers to tuples of mean wattages and the samples themselves6 that contains only our best 20 efforts, and
- strip the window identifier from each tuple, leaving only pairs of wattages and windows (line 17) before firing off a Spark job to actually do the computations we described in lines 14–17, returning the results to the driver program (line 18)
This code runs quickly enough for prototyping, but it could be a lot faster. Experienced Spark programmers likely see several opportunities for improvement here; hopefully the changes we’re about to make will cover most of them.
Broadcasting: a simple win
The first and easiest change we can make is to ensure that we aren’t passing too much data in closure environments, since that data has to be serialized and shipped to workers (in my case, on a single multiprocessor workstation, but potentially across a cluster). Line 4 in our original function relied on having a KMeansModel
available in the closure’s environment. Now, a k-means model for 128 cluster centers isn’t huge by any means, but it’s also not a trivial thing to have to serialize along with a relatively small closure. By passing the model in a Spark broadcast variable instead, we can reduce the cost of creating and executing the RDD transformations that depend on it:
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: Broadcast[KMeansModel]) = {
val windowedSamples = windowsForActivities(data, period, stripTrackpoints _).cache
val clusterPairs = windowedSamples
.map {case ((activity, offset), samples) => ((activity, offset), (closestCenter(samples.head.latlong, model.value), closestCenter(samples.last.latlong, model.value)))}
// ... rest of function omitted
}
Note that all I did here was change bestsForPeriod
to expect a Spark broadcast variable instead of a raw KMeansModel
, and then pass model.value
to the closestCenter
function where it had previously expected a KMeansModel
. This very simple change resulted in an approximately 1.1x speedup on my code. If you have large read-only data (whether bigger models or anything else) that needs to go to Spark workers with closure arguments, you might see even bigger improvements.
To create a broadcast variable, simply call the broadcast
method of SparkContext
with the value you wish to broadcast as a parameter:
> val evenNumbers = (0 to 100000 by 2).toArray
scala: Array[Int] = Array(0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198, 200, 202, 204, 206, 208, 210, 212, 214, 216, 218, 220, 222, 224, 226, 228, 230, 232, 234, 236, 238, 240, 242, 244, 246, 248, 250, 252, 254, 256, 258, 260, 262, 264, 266, 268, 270, 272, 274, 276, 278, 280, 282, 284, 286, 288, 290, 292, 294, 296, 298, 300, 302, 304, 306, 308, 310, 312, 314, 316, 318, 320, 322, 324, 326, ...
evenNumbers> val broadcastEvens = sc.broadcast(evenNumbers)
scala14/09/09 14:45:40 INFO storage.MemoryStore: ensureFreeSpace(200048) called with curMem=0, maxMem=9239474995
14/09/09 14:45:40 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 195.4 KB, free 8.6 GB)
: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) broadcastEvens
Eliminating counterproductive caching
One of the big benefits to using Spark over other distributed computing frameworks is that intermediate results can be cached in cluster memory. However, caching can be both a gift and a curse: it’s always possible to cache something, even something you use many times, whose impact on your memory usage far outweighs the cost of recomputing it. In our case, the largest collection we deal with is the windowedSamples
RDD, which, depending on the window size, can be hundreds of times larger than the input data. Actually computing the windows is not particularly expensive, they aren’t used many times, and we use only a tiny fraction of the windows at the end of the function in any case. By eliminating the .cache
call on windowedSamples
, we acknowledge that it is cheaper to recompute the few windows we’ll need to examine more than once than it is to dedicate the memory necessary to keeping the windows around. This change resulted in an approximately 1.75x speedup on my code.
I hesitate to call this a simple fix, since it requires some insight into your application. But as we’ve already seen with the Trackpoint
transformation, memory pressure can make a huge difference to Spark applications. Just because you can cache something doesn’t mean you should. (This is a lesson that users of tabled Prolog systems learn quickly: if you start by tabling – that is, memoizing – every relation in your database, you’ll have a fast, expressive environment until everything explodes and your computer becomes a pile of glowing sand.)
Being sensitive to shuffles
Recall that when Spark actually executes a job, it pipelines as many individual tasks as possible into stages, which can be executed on workers without coordinating or shuffling data.7 Ideally, stages will consist of several tasks to allow for high utilization; intuitively, we want workers to spend far more time computing results than coordinating with the driver. However, when RDD transformations change the keys in a collection of pairs (for example), we force Spark to coordinate between workers, shuffling RDD elements around in order to repartition them.
An interesting non-improvement
Since our application deals with some fairly large collections (windowedSamples
), shuffling RDDs can be especially painful. However, because of the way we want to use the data, it’s hard to eliminate all of the shuffles in our method. So instead, we’ll examine the impact of the shuffle by looking at another version of bestsForPeriod
that starts by fusing the two transformations on windowSamples
(from lines 3–5 in our original excerpt) into one transformation:
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: Broadcast[KMeansModel]) = {
val windowedSamples = windowsForActivities(data, period, stripTrackpoints _)
val bests = windowedSamples.map {
case ((activity, offset), samples) =>
(
(closestCenter(samples.head.latlong, model.value), closestCenter(samples.last.latlong, model.value)),
(samples.map(_.watts).reduce(_ + _) / samples.size, (activity, offset))
)
}
val top20 = bests.reduceByKey ((a, b) => if (a._1 > b._1) a else b)
.reduceByKey ((a, b) => if (a._1 > b._1) a else b)
.map {case ((headCluster, tailCluster), (watts, (activity, offset))) => (watts, (activity, offset))}
.sortByKey(false)
.take(20)
.context.parallelize(top20, app.context.defaultParallelism * 4)
app.map {case (watts, (activity, offset)) => ((activity, offset), watts)}
.join (windowedSamples)
.map {case ((activity, offset), (watts, samples)) => (watts, samples)}
.collect
}
This transformation fusion essentially eliminates one of the map
transformations and also a join
, giving us a collection of tuples of start-end cluster pairs and pairs of mean wattages and window identifiers. Interestingly, eliminating these transformations made the application slower! In fact, it ran approximately 19% slower than the previous version. However, this code affords more opportunities to eliminate re-keying and shuffles, and in so doing, we can once again achieve comparable performance to the previous version.
Here’s where the code wound up after several transformations:
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: Broadcast[KMeansModel]) = {
val windowedSamples = windowsForActivities(data, period, stripTrackpoints _)
val bests = windowedSamples.map {
case ((activity, offset), samples) => (
(closestCenter(samples.head.latlong, model.value), closestCenter(samples.last.latlong, model.value)),
((activity, offset), samples.map(_.watts).reduce(_ + _) / samples.size)
)
}.cache
val top20 = bests.reduceByKey ((a, b) => if (a._2 > b._2) a else b)
.map { case ((_, _), keep) => keep }
.takeOrdered(20)(Ordering.by[((String, Int), Double), Double] { case ((_, _), watts) => -watts})
.context.parallelize(top20, app.context.defaultParallelism * 4)
app.join (windowedSamples)
.map {case ((activity, offset), (watts, samples)) => (watts, samples)}
.collect
}
Note that I’ve eliminated a lot of tuple transposition and other unnecessary re-keyings. In particular, one of the biggest wins was to use the takeOrdered
method with an explicit Ordering
to avoid re-keying tuples on mean wattages in order to use take(n)
. (There are still some tuple re-keyings that could be eliminated; in particular the map
on line 19 could probably be handled more efficiently in the driver in many cases, but we’re talking about marginal gains at this point.)
After these changes, the fused-transformation implementation performed almost exactly as well in terms of wall-clock time as the previous implementation. Since the prior implementation was slightly higher-level, I decided I’d prefer it as a basis for future efforts, but I kept the takeOrdered
instead of take(n)
, which resulted in a small (~5%) speedup.8
Lazily materializing sample windows
The biggest improvement I made was the most invasive, but it addressed the most obvious deficiency of the initial implementation: specifically, it’s terribly wasteful to store and shuffle sample windows that are relatively huge in the aggregate but cheap to compute and unlikely to be needed.
Instead of keeping a collection of sample windows keyed by window identifiers, I instead elected to summarize efforts by recording Effort
structures containing mean wattage, activity name, and starting and ending timestamps. In order to do this, I generalized the sliding-window behavior from my ActivitySliding
trait, allowing clients to specify their own transformations for individual windows:
trait ActivitySliding {
import org.apache.spark.rdd.RDD
import com.freevariable.surlaplaque.data.Trackpoint
def windowsForActivities[U](data: RDD[Trackpoint], period: Int, xform: (Trackpoint => U) = identity _) = {
applyWindowed(data, period, {case (activity, samples, offset) => ((activity, offset), samples.map(xform))})
}
def applyWindowed[U: ClassTag](data: RDD[Trackpoint], period: Int, xform: ((String, Seq[Trackpoint], Int) => U)) = {
val pairs = data.groupBy((tp:Trackpoint) => tp.activity.getOrElse("UNKNOWN"))
.flatMap {
pairscase (activity: String, stp:Seq[Trackpoint]) =>
(stp sliding period).zipWithIndex.map { case (s, i) => xform(activity, s, i) }
}
}
def applyWindowedNoZip[U: ClassTag](data: RDD[Trackpoint], period: Int, xform: ((String, Seq[Trackpoint]) => U)) = {
val pairs = data.groupBy((tp:Trackpoint) => tp.activity.getOrElse("UNKNOWN"))
.flatMap {
pairscase (activity: String, stp:Seq[Trackpoint]) => (stp sliding period).map { xform(activity, _) }
}
}
def identity(tp: Trackpoint) = tp
}
I was then able to use the applyWindowedNoZip
function to generate a collection of tuples in which the keys were start-end cluster pairs and the values were Effort
structures. I still needed to filter the whole dataset to find the few windows I cared about after identifying the best efforts for each pair of clusters, but that was pretty straightforward. However, I now needed to make sure I cached that RDD of every trackpoint before entering bestsForPeriod
in order to avoid reloading them from disk multiple times! Here’s what the final code looked like:
def bestsForPeriod(data: RDD[Trackpoint], period: Int, app: SLP, model: Broadcast[KMeansModel]) = {
val clusteredMMPs = applyWindowedNoZip(data, period, {
case (activity:String, samples:Seq[Trackpoint]) =>
(
(closestCenter(samples.head.latlong, model.value), closestCenter(samples.last.latlong, model.value)),
Effort(samples.map(_.watts).reduce(_ + _) / samples.size, activity, samples.head.timestamp, samples.last.timestamp)
)
})
clusteredMMPs.reduceByKey ((a, b) => if (a.mmp > b.mmp) a else b)
.takeOrdered(20)(Ordering.by[((Int, Int), Effort), Double] { case (_, e:Effort) => -e.mmp })
.map {
case (_, e: Effort) => (
.mmp,
e.filter {
datacase tp: Trackpoint => tp.activity.getOrElse("UNKNOWN") == e.activity && tp.timestamp <= e.endTimestamp && tp.timestamp >= e.startTimestamp
}.collect
)
}
}
This code is not particularly more difficult to read than the code we started with, but it is substantially faster; it is over 3.7x faster than the prior version and more than 14x faster than my initial prototype (not shown in this post). There are certainly further improvements to make (even some fairly obvious ones), but an important lesson from this exercise is that Spark not only makes it easy to experiment with novel analytic approaches, it also makes it easy to experiment with improved implementations that still retain their declarative elegance.
Conclusions
This post has shown several simple, orthogonal changes to a Spark application that readily compose together for a huge performance improvement. The changes touched on several broader themes you’ll want to keep in mind as you develop your own Spark applications:
- Avoid shipping large data in closure environments, but instead prefer broadcast variables;
- Don’t cache indiscriminately, but be sure that the cost of added memory pressure is worth the savings of avoiding recomputation;
- Understand Spark’s execution model in order to help Spark help you; and
- Don’t be featured on the next episode of “Big Data Hoarders,” but rather structure your application to embrace laziness and keep only what you need!
Thanks for reading! If you have further tips, you can reply to this post via Twitter.
Footnotes
See this post for a high-level overview of the technique used by this app, along with some discussion of how the data guided the choices I made.↩︎
Well, almost all of the metrics. It doesn’t capture heart rate or cadence because I’m not doing anything with those yet, and it doesn’t include speed sensor data because that can be derived from the coordinates of pairs of trackpoints.↩︎
Xiangrui Meng proposed an implementation of sliding windows for RDDs earlier this year; the discussion in that link provides some good background on why exposing sliding-window functionality might be a bad idea. (Briefly, the issue is that cross-partition windows are necessarily very expensive, so as the window size approaches – or surpasses – the number of elements in a partition, performance tanks.) A sliding window implementation is currently available as an internal API for MLlib algorithms, though.↩︎
This is the most obvious performance pitfall of this approach, and it was obvious from the beginning. Having the windows around, though, made experimenting with my data much easier. Spoiler alert: we’ll look at what happens when we eliminate this liability last.↩︎
Advanced Spark users will also note that this statement involves a small lie, since the
zipWithIndex
method used inwindowsForActivities
needs to launch a Spark job to appropriately number records across partitions.↩︎The samples themselves are important since we want to plot the paths on which the best efforts occurred.↩︎
A great introductory explanation of how Spark executes jobs is Aaron Davidson’s talk from Spark Summit 2014.↩︎
Note that these different approaches may have resulted in similar wall-clock times but could have higher or lower utilization – and thus different ultimate performance characteristics – depending on your environment. Be sure to test possible improvements with your code and data!↩︎