J. Brisbin

Just another Wordpress.com weblog

Rapid Application Prototyping with Groovy and RabbitMQ

with 4 comments

UPDATE: Groovy’s Guillaume Laforge offered some very good improvements to make this more Groovy-centric. Thanks Guillaume!

I’m knee-deep in a number of new RabbitMQ-backed virtual cloud utilities and one of the things I’m finding is that these asynchronous, messaging-oriented applications are pretty hard to visualize. In writing my vcloud session manager, I was finding I couldn’t really see in my head what was happening when I dumped a message into queue “X”, or why it wasn’t showing up in consumer “Y”. I wrote the Groovy DSL for RabbitMQ to help me with this. Now I want to show you a more in-depth example of how I’m using the DSL to rapidly prototype applications and help me see, in concrete terms, how exchanges, queues, and messages are wired together.

One of the hurdles I’m having to overcome with all these virtual cloud components (a dozen or more virtual machines that do various things) is keeping configuration files in sync. At the moment I’m using a combination of cron jobs and shared NFS mounts. Neither of these is what I’m really after, though. Sure, they work, but they’re not very dynamic and they suffer from single points of failure. To get away from that, I’m writing some utilities (some in Python, some in Java) to keep these configuration files in sync. To visualize how this application is going to work, I’m prototyping it in Groovy, using the RabbitMQ DSL.

First Steps

One of the first things I know I need is a membership manager. In my prototype, I’ll use an in-memory array and a very simple “fanout” exchange to listen for membership events:

def members = []
mq.exchange name: "vcloud.config.membership", type: "fanout"

When we get a membership message, we’ll push the node name onto the members stack:

// Manage membership
queue(name: "master.membership", routingKey: "") {

  consume onmessage: {msg ->
    println "Received ${msg.envelope.routingKey} event " +
            "from ${msg.bodyAsString}..."
    if(msg.envelope.routingKey == "join") {
      members << msg.bodyAsString
      println "Members: ${members.toString()}"
      return msg.envelope.routingKey != "exit"
    }
  }

}

Now, I’ll publish a couple membership messages to this queue to see my “println” messages in the console, and see the membership list change. The full Groovy DSL code to do this is:

println "Creating membership exchange"
mq.exchange(name: "vcloud.config.membership", type: "fanout") {

  // Manage membership
  queue(name: "master.membership", routingKey: "") {
    consume onmessage: {msg ->
      println "Received ${msg.envelope.routingKey} event " +
              "from ${msg.bodyAsString}..."
      if(msg.envelope.routingKey == "join") {
        members << msg.bodyAsString
        println "Members: ${members.toString()}"
      }
      return msg.envelope.routingKey != "exit"
    }
  }

  // Join two nodes
  publish routingKey: "join", body: "dev1"
  publish routingKey: "join", body: "dev2"

}

Running this results in the following console output:

Creating membership exchange
Received join event from dev1...
Members: [dev1]
Received join event from dev2...
Members: [dev1, dev2]

This is very helpful because it lets me quickly comprehend how my queues exist under specific exchanges. It lets me visually connect message publishing to message consuming. If I wanted to, I could now take this Groovy code and code a real application against it. Of course, there’s nothing stopping me from simply writing the “real” application entirely in Groovy, either!

More Consumers

Now that I’ve got a skeleton upon which to prototype my membership manager, I can create a couple queues that simulate the actual nodes that will be joining the cloud and listening for configuration file change events.

This exchange will be a topic exchange, so I can send messages to the entire group:

// Listen for incoming config events on test node 1
queue(name: "dev1", routingKey: "dev1") {
  consume tag: "dev1.config", onmessage: {msg ->
    println " ***** INCOMING: ${msg.toString()}"
    if(msg.properties.headers["key"]) {
      println "(dev1) Config change for key: ${msg.properties.headers['key']}"

      // Also let node 2 know about this
      send("vcloud.config.events", "dev2", ["key": "firstconfig"], msg.body)
    }
    return false
  }
}

This dumps the incoming message to stdout and forwards the message on to a second node, which has a virtually identical consumer.

The Completed Prototype

Here’s the complete prototype file, in its entirety:

// Handle errors by simply printing stacktraces
mq.on error: {err ->
  err.printStackTrace()
}

mq {channel ->
  //channel.exchangeDelete("vcloud.config.membership")
  //channel.exchangeDelete("vcloud.config.events")
}

def configMap = [
  "firstconfig": "/etc/first.conf",
  "secondconfig": "/etc/second.conf"
]

def members = []

println "Creating membership exchange"
mq.exchange(name: "vcloud.config.membership", type: "fanout") {

  // Manage membership
  queue(name: "master.membership", routingKey: "") {
    consume onmessage: {msg ->
      println "Received ${msg.envelope.routingKey} event from ${msg.bodyAsString}..."
      if(msg.envelope.routingKey == "join") {
        members << msg.bodyAsString
        println "Members: ${members.toString()}"
      }
      return msg.envelope.routingKey != "exit"
    }
  }

  // Join two nodes
  publish routingKey: "join", body: "dev1".bytes
  publish routingKey: "join", body: "dev2".bytes

}

println "Creating events exchange"
mq.exchange(name: "vcloud.config.events", type: "topic") {

  // Listen for incoming config events on test node 1
  queue(name: "dev1", routingKey: "dev1") {
    consume tag: "dev1.config", onmessage: {msg ->
      println " ***** INCOMING: ${msg.toString()}"
      if(msg.properties.headers["key"]) {
        def configKey = msg.properties.headers["key"].toString()
        println "(dev1) Config change for ${configKey}: ${configMap[configKey]}"

        // Also let node 2 know about this
        send("vcloud.config.events", "dev2", ["key": "firstconfig"], msg.body)
      }
      return false
    }
  }

  // Listen for incoming config events on test node 2
  queue(name: "dev2", routingKey: "dev2") {
    consume tag: "dev2.config", onmessage: {msg ->
      println " ***** INCOMING: ${msg.toString()}"
      if(msg.properties.headers["key"]) {
        def configKey = msg.properties.headers["key"].toString()
        println "(dev2) Config change for ${configKey}: ${configMap[configKey]}"
      }
      return false
    }
  }

  // Send some config messages
  println "Sending 'firstconfig' event..."
  queue(routingKey: "dev1") {
    publish key: "firstconfig", body: "contents of new /etc/first.conf file."
  }

}

send("vcloud.config.membership", "exit", "")

Advertisements

Written by J. Brisbin

April 12, 2010 at 9:46 pm

Posted in RabbitMQ

Tagged with

4 Responses

Subscribe to comments with RSS.

  1. It was actually very comforting to know I wasn’t crazy when I arbitrarily decided I needed to resync configurations to point to healthy servers in the event of an error and picked RabbitMQ to fan out messages to notify queue listeners of a config change event.

    So I decided I needed to use RabbitMQ for robust communication in cloud environments (I was already using ejabberd but wanted something designed more for machines as message consumers) and then luckily before I spent too many days on the solution for my problem (which I hadn’t finished modeling) I came upon your Groovy examples for RabbitMQ. Simply Top Notch.

    On multiple levels, it’s a real example of java code for rabbitmq, so it could work great as a library alone, but with the DSL syntax, it makes doing the mundane parts a cinch. Which for me is the writing of boilerplate java code especially when I’m rusty as hell at it after doing nothing but Erlang. Plus, only the code that is relevant to messaging is what I’m working with. That to me is “Lean and Mean”.

    And I was able to figure out precisely how to implement what I needed. I also learned a lot of Groovy in the process. So just wanted to thank you for developing an insightful and wonderful library and way of approaching and modeling message oriented platforms using rabbitmq that is pretty exciting to me. Also I have to credit Groovy and your decision to use a DSL in the first place, as that actually made it a lot easier for me to understand the logic behind your approach. It made modeling this problem simple and paves the way for me to begin modeling more complex things in a simple repeatable fashion.

    Pedram

    June 6, 2010 at 11:43 am

    • It’s awesome to hear the DSL is working for you! Thanks for taking the time to let me know how you’re using it.

      J. Brisbin

      June 7, 2010 at 4:11 pm

  2. […] This post was mentioned on Twitter by alexis richardson. alexis richardson said: RT @j_brisbin: Writing an example of Rapid Application Prototyping using my Groovy RabbitMQ DSL. <- http://bit.ly/9f7S5K […]

  3. […] Rapid Application Prototyping with Groovy and RabbitMQ « J. Brisbin (tags: groovy dsl rabbitmq messaging) […]


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: