Quickly visualizing MySQL data

I constantly find myself digging through some of our raw data here at Mate1 looking for patterns or trying to figure out why we are seeing new ones. I usually sift through logs as well as user data and I join between a few tables. After I get most of my data I’ll apply transformations on the data, I group it, I compute aggregates, etc. At the end of this I’m usually greeted with MySQL’s usual tabular format which although is quite useful at times is also not the best format for showing trends and distributions. Most of this work that I’m doing is very adhoc and is not worth having any of the analytic team spend any amount of time on.

A while back I saw an interesting blog post show up in my RSS feed from Avery about afterquery. Afterquery is pretty damn cool. It pulls JSON data over JSONP from an HTTP server and allows you to apply transforms on it before either showing it as a table or graphing it. I thought to myself I’d love to be able to make use of this tool somehow when I needed to visualize some of my MySQL queries. I’d need a way to easily transform MySQL’s results into JSONP…

I didn’t think much about this until last night when I needed to visualize the amount of billing transactions with a certain inconsistent state over time because Wendy, Felix, and myself were wondering why we had some Hive queries taking a long time (turns out they’re running over invalid transactions and confusing the entire query). I ended up pulling the data from MySQL as CSV into Google Doc’s as a spread sheet and drew a quick graph from it. I’m sure there are ways to do this however I had none at my disposal and I don’t run Windows/Excel or any tools similar to that. I tried running Excel in a virtual machine for a while, that didn’t last too long.

After some googling around I turned up an application I’d seen a few years ago, dbslayer. dbslayer was put together by the NewYork Times as an HTTP server written in C that connects to MySQL and serves query results as JSON. I figured I could get this thing running and modify it so it can create JSONP instead of the JSON object it returned. After a bit of hacking I got it successfully returning JSONP the way afterquery expects it. It works something like this:

http://dbslayer/?query=select * from table limit 10

I had never tried using activequery before that point so I downloaded the sources and got it running locally. I then opened it up in my browser and pointed it at my dbslayer server. The URL bar had something like this:

http://afterquery/?url=http://dbslayer/?query=select * from table limit 10

When I let that URL go through I saw activequery’s access logs update followed by dbslayer’s debugging logs on the console and finally saw the data render into a table in the browser. Bingo! That meant I could quickly and easily pull out any data from MySQL, pre-process it in MySQL if needed, use activequery to process it after MySQL and render it in my web browser in a slick and expressive way. Now we’re getting somewhere…

After I got this working I started playing around with different queries and afterquery transforms. I got a series of interesting queries visualized as pie charts, bar graphs, line graphs, stacked graphs, and tree graphs. This stuff was seriously useful, and, quite easy to get the hang of. One of the annoyances I ran into though was that every time I changed the textarea that contained the afterquery query and transforms it’d refetch the data from dbslayer and in turn from MySQL, even if my SQL query did not change. This meant that simply changing activequery side options triggered a full data querying and delivery all over again each time.

That wasn’t very acceptable if this thing was going to use any shared resources or pull in any fair amount of data (fair… well, a few hundred megs or so…). The first thing I though of doing was implementing some kind of cache in dbslayer because I thought I’d not want to change afterquery (yes, I’d rather change C than JS… I’m old…). I cracked open the source code again and started adding a quick in-memory hash to hold MySQL’s query results. After a bit of hacking I decided to scrap that altogether. dbslayer worked but I was not going to be making any serious changes to it. Writing in C takes longer and this is not meant to be any “serious” project so… I rewrote dbslayer in Scala on top of Akka Actors for concurrency, Netty for the HTTP layer, and implemented a simple in-memory cache for MySQL’s queries. I called this JsonDB. It needs some work at the time of this writing for it to be useful to anyone but myself (I promise to try).

So now we had the ability to goof around in afterquery without terrorizing MySQL with every little change we make - excellent. At this point I decided to pull a bigger data set since I did not have to worry about hitting the database with every little change… After the data streamed into the browser I started making changes to it. Pulling the data from JsonDB every time helped but it was still slower that I wanted it to be. Again I found myself in one of those unacceptable situations. I cracked my fingers, ignored my wrist, and wrote a bit more code, this time around, in afterquery.

I opened up afterquery and looked around the source code and managed to find my way around it relatively easily. I started by making the UI less reactive to textarea changes and forced the user to press the an udpate button to reload the graph with the new parameters. The next change I made was in afterquery’s data fetching phase of its processing pipeline. I allowed it to cache the results it fetched from JsonDB, again for that query, so it does not have to go out across the network if it does not need to. I then loaded up that mean query, waited, it rendered. I refreshed the page, the same thing happened, I saw the cache hit on JsonDB. I made a change to the afterquery filter and hit update, no networks requests went out in the network inspector, JsonDB was silent, and Chrome’s CPU usage rose up while it re-processed the data and re-rendered the tables and graph. Another Bingo! I now was able to do what I had wanted to be able to do from the start. The only hiccup I noticed at the moment is with extract_regexp. This allows you to transform the value of the data in afterquery using a regular expression. This mutates the cached data which means that the same query can not run twice on the client. If you temporarily remove that transform after you’ve run the query the first time it works. I’ll have to make this work somehow.

This is how some of the graphs look like…

I’m going to hack a bit more at both afterquery anf JsonDB to fix quirks smooth out rough edges then seriously consider building some dashboards at work with them. Visualizing small sized data in MySQL is a ton easier for me now, and its fun with afterquery’s transforms. You can find the sources here:

Yearly screenshot (=

E17 (released!) with default “Dark” theme
gource (git visuals)
Terminology (tmux)
Audacious
Running on 3 screens (1920x1200) with the right most one rotated.

Yearly screenshot (=

  • E17 (released!) with default “Dark” theme
  • gource (git visuals)
  • Terminology (tmux)
  • Audacious

Running on 3 screens (1920x1200) with the right most one rotated.

I ran across one of my older screenshots from around 1999 running Enlightenment (DR14 I think) with   Eterm, Electric Eyes, and XMMS.

I ran across one of my older screenshots from around 1999 running Enlightenment (DR14 I think) with Eterm, Electric Eyes, and XMMS.

From Typo to Tumblr

I’ve been meaning to update the Typo install that runs this site but never got the chance to do it. I looked around for a bit trying to find a replacement and finally settled on tumblr. As a result of that choice I had to cobble up a way to export posts from Typo 5.0.2 to Tumblr.

After a bit of googling around I ran across a few scripts that people wrote that seemed to work to a certain point then crash. Some were old and some werent compatible with my version of Typo. After a bit more searching and tinkering around I found some Ruby code here that allowed me to publish the Typo database to a Wordpress compatible xml file that I was able to push to Tumblr using their (simpler) V1 API with a bit of Ruby:

require "rubygems"
require "hpricot"
require "net/http"
require "uri"
require "cgi"

def http_get(domain,path,params)
  return Net::HTTP.get(domain, "#{path}?".
    concat(params.collect { |k,v| "#{k}=#{CGI::escape(v.to_s)}" }.join('&'))) if not params.nil?
  return Net::HTTP.get(domain, path)
end

doc = open("wp.xml") { |f| Hpricot(f) }

(doc/"item").each do |item|
  params = {    
    :email     => "email",
    :password  => "password",
        :type      => "regular",
    :state     => "published",
    :format    => "markdown",
    :date      => item.at('wp:post_date').innerHTML,
    :tags      => (item/"category").collect{|category| category.innerHTML}.join(","),
        :title     => item.at('title').innerHTML,
    :body      => item.at('content:encoded').innerHTML,
  }

  Net::HTTP.post_form(URI.parse("http://www.tumblr.com/api/write"), params)
end

I’ve finally moved away from that old install - one less thing to worry about (I hope!).

Akka 2.0 actors with Kafka backed durable mailboxes

Another quick hack I put together the other day was Akka 2.0 actors that use Kafka for durable mailboxes. It was more a learning exercise about implementing durable actor mailboxes for Akka than anything else. Perhaps I’ll put it to use one day soon.

The gist is here:

https://gist.github.com/2564040

mod_msg_filter - block/allow message delivery over HTTP for ejabberd

One of the requirements we have for our chat service where I work is the ability to decide whether users are allowed to chat based on business logic that we execute in our permission system. This means that we need to ask the permissions system every time people start to exchange messages. To that effect we’ve implemented mod_msg_filter and figured we’d share it in case anyone else on the list has similar use cases.

Here’s the blurb:

mod_msg_filter allows the filtering of "message"
stanzas across an HTTP service. The URL of the
service must be passed as part of the module's
configuration. Both JIDs and their resources are
passed as part of the query string and the result
is expected to be one of:

<status value="denied">
 <stanza1><error/></stanza1>
 <stanza2><error/></stanza2>
</status>

or:

<status value="allowed">
 <stanza1><noop/></stanza1>
 <stanza2><noop/></stanza2>
</status>

The values of the <error> tags or <noop> tags will
be cached in mnesia using 2 keys that look like:

{bare_jid1, bare_jid2, resource1, resource2}
{bare_jid2, bare_jid1, resource2, resource1}

The <error> tags will then be sent over to both JIDs
if the <status> has a "value" of "denied", otherwise
the original message is let through.

The mnesia cache can be flushed if the ejabberd
server is hit on a request handler that maps to this
module (for example: /mod_msg_filter/). A stanza must
be POSTed that looks like:

<flush jid="user@domain.com">

Note that no resource is included at all. This can be
used if ejabberd is part of a system that has billing
restrictions on chatting but allows presence to go
through all the time.

</status></error></noop></error></pre>

And you can find the code here.

Mate1’s activity feed: Cassandra, Kafka, Netty, Varnish, Akka, Ejabberd *updated*

This is a re-write of the original article that was here. Since then we’ve integrated Akka and Ejabberd.

A while ago we decided to give our users a news feed like system that would show them what’s going on on the site and around them. Users on Mate1 can make friends with others users by adding them to their friend list. A user can also express interest in another by flirting with them, asking them for a photo, or attempting various other forms of communication. We decided to keep track of these interests and apply certain algorithms to figure out of these people we should recommend to you and show you information about. Another part of the system monitors users that are interested in you and will try to recommend these users to you.

As well as keeping track of this data we generate and emit events when anything related to you happens in the system, be it the on-line part or the offline one. These events are produced and fired off in an asynchronous fashion using our EventManager.

The EventManager (Java compatibility layer is called EventManagerUtil) is a very simple interface that is written in Akka that provides calls like the following:

object EventManagerUtil {
  val log = Logger.getLogger(this.getClass.getName)

  val WORKERS_MAX_NUM = 24

  val router = EventManager.
    system.actorOf(Props[EventManagerWorker].
     withRouter(RoundRobinRouter(nrOfInstances = WORKERS_MAX_NUM)))

  def publish(msg: EventData) {...}

  def publish(msg: EventDataAvro) {...}

  def subscribe(topicCb: java.util.Map[EventType, SubscribeCallback], groupId: String) {...}

  def resetGroupTopicOffset(group: String, topic: EventType) {...}
}

Every one of those calls is in fact a wrapper around a message to an Akka actor. In fact the actor is not just a single one, its a router with several worker actors behind it. The router distributes work to its worker in a round robin fashion. This allows us to change the number of works up and down based on load (can be done dynamically). We can also change the type of router used so we can allow certain events to propagate faster than others (priority) or have certain workers always handle certain types of events (consistent hashing). The Akka actors will then continue to push the event into the queue asynchronously. The EventManager also allows for subscribers to tune into events (via a pull, not a push, model in order to circumvent subscriber or consumer overloading). Subscribers or event consumers pull messages, act on them, then ack the message queue and pull more. Subscribers can also “go back in time” by resetting their “offset” in the queue in order to re-process messages. This can be very useful if we discover corruption later down the pipeline or simply need to recalculate parts of the data.

The message queue is built using a Kafka cluster. Kafka allows us to have an elastic message queue (brokers can be added or removed) that is highly available. Kafka is a messaging system that was originally developed at LinkedIn to serve as the foundation for LinkedIn’s activity stream and operational data processing pipeline. There is a small number of major design decisions that make Kafka different from most other messaging systems:

  • Kafka is designed for persistent messages as the common case
  • Throughput rather than features are the primary design constraint
  • State about what has been consumed is maintained as part of the consumer not the server
  • Kafka is explicitly distributed. It is assumed that producers, brokers, and consumers are all spread over multiple machines.

After messages are stored in the queue they are then pulled out by several consumers that analyse them and decide what to do with them. Some of these events contribute towards counters. Others signal that our automatic review system has approved or denied certain messages, images, etc. so that other processes can pick them up. We also use these consumers to generate log files and log databases that are later used by our analytics system.

For the purposes of the news feed we have consumers that are tuned in to a bunch of interesting events and changes in the system. These consumers will write data based on these events into the permanent data store. The consumers make use of Akka actors and futures to parallelize blocking network requests to services or data stores and try to be as efficient as possible in order to process as much events as possible so the activity feed is in as near real time as it can be. As soon as the data is ready it is pushed into our Cassandra cluster for permanent store.

In Cassandra we’re storing 4 tiers of activity for each user. These tiers are also “rolled up” in order to provide a more compressed view where we combine events based on user and type (screen real estate is valuable!). The tiers are used to rank events based on their types. This ranking is then taken into account when a roll-up happens in order to display the most important events on top. The 4 tiers translate to 4 wide rows per user. Each row contains a lot of columns having values like:

{
    key: TimeUUID // event time

    {
        from  // user_id
        type // event type
        read // read or unread
    }
}

All we need to store is just enough data so we can reconstruct the time line. Once this data is stored in Cassandra it is then read out by the website and displayed. In order to keep the interface simple between the website and Cassandra we built a tiny Netty service that has the business logic to fetch the activity feed data and return it to the website in JSON format.

The Netty service is supports GZIP compression and sits behind a few Varnish servers that act as a cache. The presence of Varnish in front of Netty means that in case we have Cassandra issues due to maintenance or load then we’re saved by the cache (sometimes Cassandra reads can slow down under load since we do not have a separate Cassandra cluster for the roll-up calculation and analytics, yet). The service provides access in a format similar to the following:

http://server/feed?uid=18413598&tier=tier1&start=0&count=40

This allows us to display the activity feed and page around it rather easily. The HTTP interface also allows the website to perform some write operations on the activity feeds (manipulate counters, mark items as read, etc.)

The last and latest addition to this system has been the inclusion of Ejabberd into it. Ejabberd is a Jabber (XMPP) server implemented in Erlang. As we have been working on our mobile Mate1 platform we decided to integrate real time notifications to the user. Since Ejabberd speaks XMPP (can’t be done through a web browser) we use its BOSH module to allow web browser to connect to our servers (we are using Ejabberd for these notifications and also for chat). The BOSH protocol defines how arbitrary XML elements can be transported efficiently and reliably over HTTP in both directions between a client and server. The same consumers system that was mentioned before is also used to route events directly to our users in as real time as we can, through BOSH, to their mobile devices (or desktops).

At the time of this writing we’re getting ready to finish up and release our mobile platform and will put the latest parts of this system live. Some code snippets that we’ve shared while doing this project (and others) can be found here.

Guzzler: pause, resume, seek

Guzzler can now seek file and position, start, stop, and restart (from last good position) streaming from the binary log.

Guzzler with pattern based routing

Guzzler now publishes binlogs and allows pattern based subscriptions using “dbName.tableName.opName” with wild-card support.

dbName is the database, tableName is the table being acted upon and opName is one of update, insert, or delete. Wild cards are also supported.

Guzzler: Stream MySQL binary logs and consume them with Scala actors and RabbitMQ.

Guzzler allows you to stream MySQL binary logs from a master and lets you act on them using Scala actors (consumers). Consumers are configurable in guzzler.conf along with the rest of the required parameters. Included with Guzzler is a dummy consumer and a RabbitMQ one that will push queries into a RabbitMQ server for consumption.

Consumers either in Guzzler itself of behind RabbitMQ can analyze the queries (Guzzler provides an SQL query parser based on ZQL) and may decide to update counters, fire off events, log messages, etc.

https://github.com/mardambey/guzzler

Desktop at home

This is my current home desktop running ubuntu/gnome. E17 lives at work right now (=

Shrink, Spiffy, Scala!

Been writing some Scala code, its fun, and one hell of a fantastic language. You should take a look at it some time.

Happy new year!

++code ++work ++train

(=

Quick graphs with Perl / GD::Graph

So I had to quickly whip up some graphs today at work based on values coming in from one of our database tables. Nothing better then Perl and GD (GD::Graph) for a quick and effective solution.

use strict;
use DBI;
use GD::Graph::bars;
use GD::Graph::Data;

my $db_host = 'XXXXXX';
my $db_name = 'XXXXXX';   
my $db_user = 'XXXXXX';
my $db_pass = 'XXXXXX';
my $query   = "select month, undelivered from XXXXX where XXXXX";

# create labels and values for x-axis

my ($labels, $values) = get_data_from_sql($db_host, $db_name, $db_user, $db_pass, $query);

# graph and save the output

graph($labels, $values, "Month", "Undelivered", "Undelivered Messages by Month", "undelivered.png");


sub get_data_from_sql($$$$$)
{
  my ($db_host, $db_name, $db_user, $db_pass, $query) = @_;

  my $dbh = DBI->connect("dbi:mysql:$db_name:$db_host", $db_user, $db_pass)
    or die "Couldn't connect to database: " . DBI->errstr;

  my $sth = $dbh->prepare($query)
    or die "Couldn't prepare statement: " . $dbh->errstr;

  $sth->execute();
  my @row= undef;

  my @labels = ();
  my @values = ();

  while (@row = $sth->fetchrow_array())
  {
    push @labels, $row[0];
    push @values, $row[1];
  }

  $dbh->disconnect;

  return (\@labels, \@values);
}

sub graph($$$$$$)
{
  my ($labels, $values, $x_label, $y_label, $title, $out_file) = @_;

  my $data = GD::Graph::Data->new([$labels, $values,])
    or die GD::Graph::Data->error;

  my $my_graph = GD::Graph::bars->new();

  $my_graph->set(
  x_label => $x_label,
  y_label => $y_label,
  title   => $title,
  bar_spacing => 8,
  shadow_depth => 4,
  shadowclr => 'dred',
  transparent => 0,
  )
  or warn $my_graph->error;

  $my_graph->plot($data) or die $my_graph->error;
  my $gd = $my_graph->plot($data) or die $my_graph->error;
  open(IMG, ">$out_file") or die $!;
  binmode IMG;
  print IMG $gd->png;
}

If you do not want your data coming from an SQL statement simply fill in $labels and $values with anything you want, like:

$labels = ['Monday', 'Tuesday', 'Wednesday'];
$data = [45, 66, 89];

Pretty straight forward and handy. the graph looks like this.