Silex is a small library of helper code intended to make it easier to build real-world Spark applications;1 most of it is factored out from applications we’ve developed internally at Red Hat. We have a couple of long-term goals for the Silex project:

  • We want to make it very easy for us to spin up on new data-processing problems without spending a lot of time dealing with accidental complexity, and
  • we want to have a generally-useful standard library atop Spark that provides primitives and solutions to simplify common tasks and reflects best practices as we discover them.

Part of having a library that people will want to use is having a quality test suite and continuous integration. There are some tricks to developing automated tests for Spark applications, but even keeping those in mind may not be sufficient to let you test your Spark apps in a hosted CI environment. This post will show you how to bridge the gap.

Continuous integration challenges

Since we want to make it easy to review contributions to Silex, I set up Travis CI to watch branches and pull requests. Travis CI is a great service, but if you’re used to running tests locally, you might have some problems running Spark-based tests under hosted CI environments. Here’s what we learned:

  • When running tests locally, creating a Spark context with local[*] (in order to use all of the available cores on your machine) might be a good idea. However, a hosted CI environment may offer you a lot of cores but relatively little memory, so your tests might be killed because each Spark executor has nontrivial memory overhead. Instead, consider limiting your code to use fewer cores.
  • If you’re using Kryo serialization and have set the spark.kryoserializer.buffer.mb property to something large (perhaps because you often have serialization buffer crashes in production), don’t be surprised if you run out of memory while running in CI. Spark doesn’t share serializers between threads, so you could be allocating a huge buffer for each thread even if your test code doesn’t need to serialize anything all that large.2
  • Spark SQL (at least as of version 1.3.1) defaults to creating 200 partitions for shuffles. This is probably a good starting place for real-world data, but it’s overkill for test cases running on a single machine. Furthermore, since each partition has some extra memory overhead, it’s another possible cause of OOMEs.

Fixing the problems

Silex provides an application skeleton trait, and we use a class extending this trait to package up Spark and SQL contexts for test cases. That class isn’t currently part of the public Silex API, but you can see what it looks like here:

excerpt from app.scala
1
2
3
4
5
6
7
8
9
10
private [silex] class TestConsoleApp(val suppliedMaster: String = "local[2]") extends AppCommon {
  override def master = suppliedMaster
  override def appName = "console"

  addConfig( {(conf: SparkConf) => conf.set("spark.kryoserializer.buffer.mb", "2")})

  def appMain(args: Array[String]) {
    // this never runs
  }
}

As you can see, we allow users to specify a Spark master URL but default to using two cores locally. Furthermore, we use the addConfig function from AppCommon — which takes a SparkConf and returns a modified SparkConf — to ensure that our Kryo buffer size is the Spark default of 2 mb, rather than the larger Silex default.

If you’re used to writing test code that exercises Spark, you probably already have boilerplate (using something like ScalaTest’s BeforeAndAfterEach) to set up and tear down a Spark context for each test case. We set the Spark SQL property to control parallelism in data frame and SQL shuffles in the test setup code itself:

excerpt from app.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.scalatest._

import com.redhat.et.silex.app.TestConsoleApp

trait PerTestSparkContext extends BeforeAndAfterEach {
  self: BeforeAndAfterEach with Suite =>

  private var app: TestConsoleApp = null

  def context = app.context
  def sqlContext = app.sqlContext

  override def beforeEach() {
    app = new TestConsoleApp()
    System.clearProperty("spark.master.port")

    app.sqlContext.setConf("spark.sql.shuffle.partitions", "10")

    app.context
  }

  override def afterEach() {
    app.context.stop
  }
}

Conclusion

Good CI is one of those things that makes it fun to develop software, but the accidental differences between a hosted environment and your local environment can be totally frustrating. By taking into account the constraints you’re likely to see when running in a resource-limited container on a CI server, you can help to ensure that your tests only fail when your code is actually broken.


  1. “Silex” is Latin for “flint,” which seemed like a good name for something that was intended to help people make things with sparks.

  2. Thanks to Erik Erlandson for noticing this (it even affected him in a test case he was running locally).

  spark, spark sql • You may reply to this post on Twitter or