fedmsg data and Spark SQL

spark sql

October 31, 2014

In this post, I’ll briefly introduce fedmsg, the federated message bus developed as part of the Fedora project’s infrastructure, and discuss how to ingest fedmsg data for processing with Spark SQL. While I hope you’ll find the analytic possibilities of fedmsg data as interesting as I do, this post will also cover some possible pitfalls you might run into while ingesting complex JSON data or the contents of large SQL databases into Spark SQL more generally.


fedmsg is one of the coolest things about Fedora’s infrastructure.1 Because nearly everything that happens in Fedora is logged to a messaging bus, there is a vast amount of public data about the daily operation of a large and thriving open-source community, all in one place.2 The whole list of message topics published on the Fedora project’s fedmsg bus are documented here, but these examples should give you a taste of the kinds of messages available for inspection:

  • when a Fedora contributor updates the package build metadata (including RPM specfile, patches, source tarball list. etc.) in git,3
  • when a package build is started (and then again when the package build succeeds or fails, and when the package build is pushed out to the distribution),
  • when a Fedora community member tags a package with keywords,
  • when a blog post is syndicated to Fedora Planet,
  • when a wiki page is created or updated,
  • when an IRC meeting starts or ends,

and so on. Furthermore, these data are accessible either as a stream of messages (as they are generated), as JSON-encoded historical data via a REST interface, or as a PostgreSQL dump of all historical fedmsg activity.

An exhaustive treatment of fedmsg is outside the scope of this post,4 but I wanted to make it clear that

  • open infrastructure for a very large open-source project is an extremely cool thing,
  • there is a lot of interesting data available in fedmsg,
  • there have been some cool applications built atop fedmsg already (see Fedora Badges for an Xbox-like “achievements” system or Ralph Bean’s animated visualization for a couple of fun examples), and
  • there are a lot of possibilities for making sense of this data using Spark, Spark SQL, and Spark Streaming.

Getting the historical data

I wanted to look at a large amount of data to start with, so I began by grabbing a dump of the datanommer database. (Datanommer is a service that records each message on the fedmsg bus in a SQL database.) The nightly dump I got was about 3 gb compressed and expanded to about 40 gb – certainly not “big data,” but too big to fit in main memory on my workstation. I loaded the dump into a new Postgres database and started inspecting it.

The largest table by far is messages,5 which has the following schema:

CREATE TABLE messages (
    id integer NOT NULL,
    i integer NOT NULL,
    "timestamp" timestamp without time zone NOT NULL,
    certificate text,
    signature text,
    topic text,
    _msg text NOT NULL,
    category text,
    source_name text,
    source_version text,
    msg_id text

There are a couple of things to note here:

  • the certificate and signature fields account for the bulk of the raw data, and
  • the _msg field actually contains the string representation of a topic-specific JSON object.

As we’ll see, each of these can lead to some pitfalls when attempting to ingest the data into Spark.

Pitfall 1: Single-node memory limits and Slick projections

Here’s the core of a very simple program that ingests fedmsg records from a local PostgreSQL database, using Slick, and outputs these to JSON files,6 one for each record,7 in a local directory (I’m not reproducing the generated code that models datanommer tables and rows here):

// https://github.com/willb/copricapo/blob/examples/fedmsg-blog-1/src/main/scala/com/freevariable/copricapo/Importer.scala
package com.freevariable.copricapo

import scala.slick.direct._
import scala.slick.direct.AnnotationMapper._
import scala.slick.driver.PostgresDriver
import scala.slick.driver.PostgresDriver.simple._
import scala.slick.jdbc.StaticQuery.interpolation

import scala.reflect.runtime.universe

import com.freevariable.copricapo.schema._

object Importer {
  import org.json4s._
  import org.json4s.ext._
  import org.json4s.JsonDSL._
  import org.joda.time.DateTime
  import org.json4s.jackson.JsonMethods.{parse => parseJson, render => renderJson, compact}
  object JSONImplicits {
    private def tupleIfPresent[T](name: String, v: Option[T])
      (implicit cnv: T => JValue): JObject =
      v.map(some => (name -> some) ~ JObject()) getOrElse JObject()
    implicit def message2json(m: Tables.MessagesRow) = {
      ("id" -> m.id) ~
      ("i" -> m.i) ~
      ("timestamp" -> (new DateTime(m.timestamp.getTime()).toString)) ~
      tupleIfPresent("topic", m.topic) ~
      tupleIfPresent("category", m.category) ~
      ("msg" -> parseJson(m._Msg)) ~
      tupleIfPresent("msg_id", m.msgId) ~
      tupleIfPresent("source_name", m.sourceName) ~
      tupleIfPresent("source_version", m.sourceVersion)
  def apply(dburl: String, output_dir: String, limit: Int = 0) {
    import JSONImplicits._
    new java.io.File(output_dir).mkdir()
    Database.forURL(dburl, driver="org.postgresql.Driver") withSession { implicit session =>
      val messages = TableQuery[Tables.Messages]
      (if (limit == 0) messages else messages.take(limit)) foreach { m =>
        val id = m.id
        val outputFile = s"$output_dir/$id.json"
        println(s"rendering to $outputFile")
        val pw = new java.io.PrintWriter(new java.io.File(outputFile))

Note that the message2json implicit conversion doesn’t do anything with the certificate and signature fields.
These fields are necessary for critical production applications because anyone can write to the fedmsg bus. However, I’m not planning to make any irreversible decisions based on my analyses of fedmsg data and am thus not currently interested in actually verifying message contents or provenance. Since I’m also not currently interested in wasting memory or performance, I’m going to strip these out of the generated JSON.

Unfortunately, even shipping these fields from the database to the JVM only to ignore them dramatically impacts the performance and memory footprint of our importer, so we’ll want to project these away sooner. The best solution I found for efficiently projecting away nullable fields from a SQL database accessed via Slick involved modifying the generated code to control how case classes were instantiated from the database. The generated code includes definitions of case classes for rows in each table, like this:

  case class MessagesRow(id: Int, i: Int, timestamp: java.sql.Timestamp, 
    certificate: Option[String], signature: Option[String], 
    topic: Option[String], _Msg: String, category: Option[String], 
    sourceName: Option[String], sourceVersion: Option[String], msgId: Option[String])

The generated code also contains classes describing each table; these include a * method defining the default projection from a tuple to a row-specific case class. In our case, the default projection for the messages table looks like this:

  class Messages(tag: Tag) extends Table[MessagesRow](tag, "messages") {
    def * = (id, i, timestamp, certificate, signature, topic, _Msg, category, sourceName, sourceVersion, msgId) <>
      (MessagesRow.tupled, MessagesRow.unapply)

    // (remainder of class elided)

If we modify it, we can ignore certificate and signature (essentially returning SQL NULL values instead of enormous encoded objects for these fields):

    def * = (id, i, timestamp, None, None, topic, _Msg, category, sourceName, sourceVersion, msgId) <>
      (MessagesRow.tupled, MessagesRow.unapply)

Now we can fire up a REPL and produce JSON documents from the datanommer data; the following example will take the first 100 rows PostgreSQL gives us and generate documents for them in /tmp/scratch:

com.freevariable.copricapo.Importer("jdbc:postgresql:datanommer", "/tmp/scratch", 100)

Pitfall 2: The Lost Caverns (of type inference)

Spark SQL supports Hive-style records and arrays within relational data. An especially cool feature of Spark SQL, though, is that it can infer a HiveQL schema from a collection of JSON documents with complex nested structure (see the jsonFile method of SQLContext, which will operate on a single JSON document or a directory of JSON documents).

If you have a clone of the Spark repository on your machine, the easiest way to try this out is to launch sbt hive/console from within your local clone. You’ll then be able to build a schema from some JSON documents:

// sparkContext is predefined in the Spark Hive console
val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)

// /tmp/scratch contains JSON documents
val docs = sqlContext.jsonFile("/tmp/scratch")

// Inspect the schema

// Register it as a SQL table

// Get the first hundred rows as a Scala array
sqlContext.sql("select * from documents limit 100").collect

// Store a copy as a Parquet file

JSON is convenient for users because it lets us specify untyped objects with no fixed schema. Unfortunately, JSON lets us specify untyped objects with no fixed schema. This means that Spark might have trouble inferring a fixed schema for our data. Consider how type inference works in real programming languages: if a variable can take one of two possible values, its type must be the unification on their types. Sometimes, this means we get a trivial typing (in Scala terms, Any), but in other cases, it means we get a contradiction (in Scala terms, Nothing). When Spark is inferring types for our documents, it will simply raise an exception if it tries to unify two objects with diverging types, as you can see in the following example:

val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
val diverging = sparkContext.parallelize(List("""{"branches": ["foo"]}""", """{"branches": [{"foo":42}]}"""))
sqlContext.jsonRDD(diverging)  // throws a MatchError

The two JSON objects each contain a field called branches, but in the former it is an array of strings, while in the latter it is an object. In the case of fedmsg data, this problem came up because different message topics used branches to correspond to different (but related) concepts. If we could rename these different cases, we could avoid the diverging types. Since the JSON library we’re using has a facility to transform JSON values based on partial functions, we can do this easily:

  def renameBranches(msg: JValue) = {
    msg transformField {
      case JField("branches", v @ JArray(JString(_)::_)) => ("pkg_branches", v)
      case JField("branches", v @ JArray(JObject(_)::_)) => ("git_branches", v)
  def transformObject(record: JValue) = {
    record transformField {
      case JField("_msg", msg) => ("_msg", renameBranches(msg))

The renameBranches method above will simply translate fields named branches with array-of-strings values to fields named pkg_branches with the same values and translate fields named branches with array-of-objects values to fields named git_branches with the same values. Other fields are ignored by this code.

Pitfall 3: Too many columns

A related problem comes up when we have JSON objects who use field names as entity names, like in the following example (taken from a message describing a git commit):

  "stats" : {
    "files" : {
      ".gitignore" : { "deletions" : 0, "insertions" : 1, "lines" : 1 },
      "cmake-fedora.spec" : { "deletions" : 3, "insertions" : 3, "lines" : 6 },
      "sources" : { "deletions" : 1, "insertions" : 1, "lines" : 2 }
    "total" : { "deletions" : 4, "files" : 3, "insertions" : 5, "lines" : 9 }

Note that files is essentially a collection of objects describing different changed files in the commit (.gitignore, cmake-fedora.spec, and sources), but it is represented as an object with a field for each changed file. Because of this, the inferred schema will have a field in the stats.files object for every single filename that has ever been committed to Fedora’s git repository! (That’s a lot of NULL values in a typical row.)

There are other similar cases in the fedmsg data as well where entity names become field names; these can all be handled by transformation rules in fairly straightforward fashion.

What’s next

This post should have enough background to help you get started looking at fedmsg data with Spark SQL. In future posts, I’ll discuss some additional correctness and efficiency considerations; demonstrate how to unify warehoused, batched, and streaming data; and introduce some analytic applications involving fedmsg data.


  1. I’m most familiar with Fedora, but the Debian community has also begun to use fedmsg to publish events about their infrastructure.↩︎

  2. I’ve recently been reading Jay Kreps’ I ♥ Logs, in which Kreps argues (from principle and from experience) for the centrality of the log abstraction in distributed systems. I may write a longer review later, but the argument is sound (and consonant with some of the arguments initially made for basing Fedora’s infrastructure around a messaging bus) and the book is definitely worth checking out!↩︎

  3. See the distgit source for the fedmsg package for an example.↩︎

  4. For more information, read the docs or check out the slides from Ralph Bean’s client-oriented or overview presentations.↩︎

  5. There are also tables to store the names of users, the names of packages, and to model the relationships between users or packages and fedmsg messages involving those users or packages.↩︎

  6. I’m using JSON here because of the rich structure of the topic-specific messages. If you were ingesting flat relational data, it would almost certainly make more sense to use JdbcRDD and do the ingest within Spark.↩︎

  7. Please don’t use the above code in production; it’s intended to be simple, not efficient. Producing one file per record makes it easy to see where things are going wrong, but it’s very slow both for this program and for the Spark jobs that you’ll want to write to read these data. Your local filesystem probably won’t be too happy with you, either.↩︎