In this post, I’ll briefly introduce fedmsg, the federated message bus developed as part of the Fedora project’s infrastructure, and discuss how to ingest fedmsg data for processing with Spark SQL. While I hope you’ll find the analytic possibilities of fedmsg data as interesting as I do, this post will also cover some possible pitfalls you might run into while ingesting complex JSON data or the contents of large SQL databases into Spark SQL more generally.
fedmsg is one of the coolest things about Fedora’s infrastructure.1 Because nearly everything that happens in Fedora is logged to a messaging bus, there is a vast amount of public data about the daily operation of a large and thriving open-source community, all in one place.2 The whole list of message topics published on the Fedora project’s fedmsg bus are documented here, but these examples should give you a taste of the kinds of messages available for inspection:
- when a Fedora contributor updates the package build metadata (including RPM specfile, patches, source tarball list. etc.) in
- when a package build is started (and then again when the package build succeeds or fails, and when the package build is pushed out to the distribution),
- when a Fedora community member tags a package with keywords,
- when a blog post is syndicated to Fedora Planet,
- when a wiki page is created or updated,
- when an IRC meeting starts or ends,
and so on. Furthermore, these data are accessible either as a stream of messages (as they are generated), as JSON-encoded historical data via a REST interface, or as a PostgreSQL dump of all historical fedmsg activity.
An exhaustive treatment of fedmsg is outside the scope of this post,4 but I wanted to make it clear that
- open infrastructure for a very large open-source project is an extremely cool thing,
- there is a lot of interesting data available in fedmsg,
- there have been some cool applications built atop fedmsg already (see Fedora Badges for an Xbox-like “achievements” system or Ralph Bean’s animated visualization for a couple of fun examples), and
- there are a lot of possibilities for making sense of this data using Spark, Spark SQL, and Spark Streaming.
Getting the historical data
I wanted to look at a large amount of data to start with, so I began by grabbing a dump of the datanommer database. (Datanommer is a service that records each message on the fedmsg bus in a SQL database.) The nightly dump I got was about 3 gb compressed and expanded to about 40 gb — certainly not “big data,” but too big to fit in main memory on my workstation. I loaded the dump into a new Postgres database and started inspecting it.
The largest table by far is
messages,5 which has the following schema:
1 2 3 4 5 6 7 8 9 10 11 12 13
There are a couple of things to note here:
signaturefields account for the bulk of the raw data, and
_msgfield actually contains the string representation of a topic-specific JSON object.
As we’ll see, each of these can lead to some pitfalls when attempting to ingest the data into Spark.
Pitfall 1: Single-node memory limits and Slick projections
Here’s the core of a very simple program that ingests fedmsg records from a local PostgreSQL database, using Slick, and outputs these to JSON files,6 one for each record,7 in a local directory (I’m not reproducing the generated code that models datanommer tables and rows here):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
Note that the
message2json implicit conversion doesn’t do anything with the
These fields are necessary for critical production applications because anyone can write to the fedmsg bus. However, I’m not planning to make any irreversible decisions based on my analyses of fedmsg data and am thus not currently interested in actually verifying message contents or provenance. Since I’m also not currently interested in wasting memory or performance, I’m going to strip these out of the generated JSON.
Unfortunately, even shipping these fields from the database to the JVM only to ignore them dramatically impacts the performance and memory footprint of our importer, so we’ll want to project these away sooner. The best solution I found for efficiently projecting away nullable fields from a SQL database accessed via Slick involved modifying the generated code to control how case classes were instantiated from the database. The generated code includes definitions of case classes for rows in each table, like this:
1 2 3 4
The generated code also contains classes describing each table; these include a
* method defining the default projection from a tuple to a row-specific case class. In our case, the default projection for the
messages table looks like this:
1 2 3 4 5 6
If we modify it, we can ignore
signature (essentially returning SQL
NULL values instead of enormous encoded objects for these fields):
Now we can fire up a REPL and produce JSON documents from the datanommer data; the following example will take the first 100 rows PostgreSQL gives us and generate documents for them in
Pitfall 2: The Lost Caverns (of type inference)
Spark SQL supports Hive-style records and arrays within relational data. An especially cool feature of Spark SQL, though, is that it can infer a HiveQL schema from a collection of JSON documents with complex nested structure (see the
jsonFile method of
SQLContext, which will operate on a single JSON document or a directory of JSON documents).
If you have a clone of the Spark repository on your machine, the easiest way to try this out is to launch
sbt hive/console from within your local clone. You’ll then be able to build a schema from some JSON documents:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
JSON is convenient for users because it lets us specify untyped objects with no fixed schema. Unfortunately, JSON lets us specify untyped objects with no fixed schema. This means that Spark might have trouble inferring a fixed schema for our data. Consider how type inference works in real programming languages: if a variable can take one of two possible values, its type must be the unification on their types. Sometimes, this means we get a trivial typing (in Scala terms,
Any), but in other cases, it means we get a contradiction (in Scala terms,
Nothing). When Spark is inferring types for our documents, it will simply raise an exception if it tries to unify two objects with diverging types, as you can see in the following example:
1 2 3
The two JSON objects each contain a field called
branches, but in the former it is an array of strings, while in the latter it is an object. In the case of fedmsg data, this problem came up because different message topics used
branches to correspond to different (but related) concepts. If we could rename these different cases, we could avoid the diverging types. Since the JSON library we’re using has a facility to transform JSON values based on partial functions, we can do this easily:
1 2 3 4 5 6 7 8 9 10 11 12
renameBranches method above will simply translate fields named
branches with array-of-strings values to fields named
pkg_branches with the same values and translate fields named
branches with array-of-objects values to fields named
git_branches with the same values. Other fields are ignored by this code.
Pitfall 3: Too many columns
A related problem comes up when we have JSON objects who use field names as entity names, like in the following example (taken from a message describing a
1 2 3 4 5 6 7 8 9 10
files is essentially a collection of objects describing different changed files in the commit (
sources), but it is represented as an object with a field for each changed file. Because of this, the inferred schema will have a field in the
stats.files object for every single filename that has ever been committed to Fedora’s
git repository! (That’s a lot of
NULL values in a typical row.)
There are other similar cases in the fedmsg data as well where entity names become field names; these can all be handled by transformation rules in fairly straightforward fashion.
This post should have enough background to help you get started looking at fedmsg data with Spark SQL. In future posts, I’ll discuss some additional correctness and efficiency considerations; demonstrate how to unify warehoused, batched, and streaming data; and introduce some analytic applications involving fedmsg data.
I’ve recently been reading Jay Kreps’ I ♥ Logs, in which Kreps argues (from principle and from experience) for the centrality of the log abstraction in distributed systems. I may write a longer review later, but the argument is sound (and consonant with some of the arguments initially made for basing Fedora’s infrastructure around a messaging bus) and the book is definitely worth checking out!↩
There are also tables to store the names of users, the names of packages, and to model the relationships between users or packages and fedmsg messages involving those users or packages.↩
I’m using JSON here because of the rich structure of the topic-specific messages. If you were ingesting flat relational data, it would almost certainly make more sense to use JdbcRDD and do the ingest within Spark.↩
Please don’t use the above code in production; it’s intended to be simple, not efficient. Producing one file per record makes it easy to see where things are going wrong, but it’s very slow both for this program and for the Spark jobs that you’ll want to write to read these data. Your local filesystem probably won’t be too happy with you, either.↩