J. Brisbin

Just another Wordpress.com weblog

Posts Tagged ‘python

Federation in the cloud with RabbitMQ and batch messages

with 2 comments

Our remote locations are connected to the central data warehouse through a fantastically complicated and expensive DSL and partial T1 WAN. The partial T1 was only a 56k connection! Yep, you read that right. We were pumping megabytes of data every day through a 56k connection. Most of those have been converted to DSL now, so that severe bandwidth limitation has been eased, but it’s still a limitation and we still have to be bandwidth-conscious when developing new applications that require transmission of data across the WAN.

To solve this problem with our PostgreSQL installations, the C++ guru we had at the time (who abandoned us for a cushy–and well-paying–position with a financial services firm on Wall street) wrote a handy little utility to accept SQL statements as input, run them against the database, then zip up the results and send them back to the requestor. The SQL results had to be zipped up because of the severe bandwidth limitation.

In moving to RabbitMQ, I want to make sure that we still have a way to batch and zip up multiple messages, which we distribute to the consumers here in the data center, then have the batcher consume all the results, batch those results up, and send the whole zip file back to the requestor. It’s a little complicated (I hope not unnecessarily) but a good example of doing “federation” in a cloud (cluster/hybrid cloud/virtual cloud/…who can decide which term fits, anyway?) environment.

The Batch Message Handler

To help me with writing handlers for different message types, I have a generic QueueingMessageListener whose sole responsibility is to process incoming messages and pass them off to the appropriate Handler subclass. One of these handlers works with batch messages.

These batch messages don’t have to be created in Java, either. Here’s some Python code which creates a “batch” message to send to my Java-language consumers:


dta = {
  'start': 1,
  'limit': 20,
  'datasource': 'as400-development',
  'sql': 'SELECT count(*) AS total FROM MYLIB.MYFILE WHERE CONO=?',
  'params': [ 1 ]
}
jsql = json.dumps(dta)

out = StringIO()
zfile = zipfile.ZipFile(out, 'w')

now = datetime.now()
zinfo_date = (now.year, now.month, now.day, now.hour, now.minute, now.second)
zinfo = zipfile.ZipInfo('test-1', zinfo_date)
zfile.writestr(zinfo, jsql)

now = datetime.now()
zinfo_date = (now.year, now.month, now.day, now.hour, now.minute, now.second)
zinfo = zipfile.ZipInfo('test-2', zinfo_date)
zfile.writestr(zinfo, jsql)

zfile.close()

If you’re not familiar with Python, don’t panic! All this code does is create a zip file, serialize a Python dictionary into JSON, and add the same text to two different entries within the zip file. This will ultimately acheive the same end as sending the message two differents times, directly to the SQL executor service.

I call them “batch” messages, but they’re really just zip files containing a text file of JSON-encoded data (the body of the real message) and having a filename in the zip file that corresponds to the “request id” (or correlationId, if you prefer the RabbitMQ-centric terminology). The batcher unzips the message and extracts the contents, creating one new message per zip entry:

ByteArrayInputStream bytesIn = new ByteArrayInputStream(bytes);
ZipInputStream in = new ZipInputStream(bytesIn);
try {
  ZipEntry entry;
  while (null != (entry = in.getNextEntry())) {
    String subRequestId = entry.getName();
    byte[] buff = new byte[4096];
    bytesOut = new ByteArrayOutputStream();
    for (int bytesRead = 0; bytesRead > -1; bytesRead = in.read(buff)) {
      bytesOut.write(buff, 0, bytesRead);
    }
    JsonParser p = new MappingJsonFactory().createJsonParser(bytesOut.toByteArray());
    Map request = p.readValueAs(Map.class);
    if (null != request) {
      Message msg = new StandardMessage(null, bytesOut.toByteArray());
      msg.setRequestId(subRequestId);
      msg.setSecurityKey(getSecurityKey());
      msg.setType(getType());
      msg.setRequest(request);
      messages.add(msg);
    }
  }
} catch (IOException e) {
  log.error(e.getMessage(), e);
}

Delegation Execution

The batch message handler doesn’t do any real work itself, it only unzips the messages which are zip files and dumps the individual entries into their respective queues.

This does create a problem, though, with the results. I’ve made it configurable so that a request can send a batch message and receive results for each of those embedded requests one-by-one, as they are ready. This is a little easier because the batch message handler consumer doesn’t have to create a temporary queue and listen for results itself.

If you want the results back as a zip file, though, the batch message handler just declares an “anonymous” queue and sets that as the RepyTo. To use this feature, the requestor has to provide a timeout value, otherwise the batch message handler won’t know how long to wait if it doesn’t receive a response for every request.

Assuming all goes well…

… and the batch message handler gets an equal number of responses for the number of requests it sent out, it zips everything into a response file, and dumps that zip file into the requestor’s response queue.

Advertisements

Written by J. Brisbin

April 15, 2010 at 8:14 pm

Posted in RabbitMQ, The Virtual Cloud

Tagged with , ,