The title totally overemphasizes the topic, but here we go. By default ActiveMessaging will process all your queues in one thread. All messages from all queues will be processes sequentially. This isn't always what you want. Especially in scenarios where you have both long-running tasks being kicked off through messages, and rather short-lived tasks that you just fire and forget.

ActiveMessaging has a rather simple and not-so-obvious way of dealing with that: processor groups. There's some documentation on them, but it doesn't bring out the real beauty of them.

Basically you split your processors in groups, how finely grained is up to you. A simple way would be to just separate long-running from short-lived tasks. You just have to define these in config/messaging.rb:

ActiveMessaging::Gateway.define do |s|
  s.destination :index_update, '/queue/IndexUpdate'
  s.destination :giant_batch_job, '/queue/GiantBatchJob'
  s.processor_group :short, :index_update_processor
  s.processor_group :long, :giant_batch_job_processor
end

Now that you have these, how do you get them to run in different threads? If you just use script/poller start, it will continue to work through all messages from all queues. You need to start each processor group individually:

$ script/poller start ---- process-group=short
$ script/poller start ---- process-group=long

Keep in mind though that you can't stop just one poller for one particular processor group. Running script/poller stop will tear them all down. Which comes in handy during deployments. That way you only have to ensure that all your process groups are started during the deployment, but not about stopping every one of them.

ActiveMessaging will run each group in a separate thread which all are monitored by the poller_monitor. The latter will only be started once, courtesy of the daemons package.

I've been playing around with ActiveMessaging recently. Well, actually more than that. I integrated it into a project for asynchronous processing. It's a pretty neat plugin. We're using StompServer as a message broker, and therefore the Stomp protocol to publish and poll the messages.

Now Stomp is a pretty simple protocol and breaks down when you're trying to deliver "complex" data structures like hashes, arrays or *gasp* objects. That's not a bad thing per se, since we can serialize them with YAML. Of course you could just always do that by hand before publishing a message, but let's face it, that's just tedious.

The author of ActiveMessaging recently added support for filters. They can be run after publishing a message and/or before processing it on the polling side. I hear it clicking on your end, why not use filters to do the serializing work for us? Right on!

Here's a simple filter to serialize the message when it's sent:

class SerializeToYamlFilter < ActiveMessaging::Filter
  attr_accessor :options

  def initialize(options={})
    @options = options
  end

  def process(message, routing)
    if message.body.respond_to?(:to_yaml)
      message.body = message.body.to_yaml
    else
      message.body = YAML::dump(message.body)
    end
  end
end

It uses the to_yaml method mixed in by Rails, if it's available. Otherwise it just dumps the object with the YAML::dump method.

The receiving end is even easier.

class DeserializeYamlFilter < ActiveMessaging::Filter
  attr_accessor :options

  def initialize(options={})
    @options = options
  end

  def process(message, routing)
    message.body = YAML::load(message.body) rescue message.body
  end
end

The filter respects potential deserializing errors and just returns the message body in that case. Otherwise it just loads the objects from the message body. And that's the whole story.

Now you need to configure it in config/messaging.rb and you're good to go:

ActiveMessaging::Gateway.define do |s|
  s.filter :deserialize_yaml_filter, :direction => :incoming
  s.filter :serialize_to_yaml_filter, :direction => :outgoing
end

The benefit? This way you can send more complex data structures (as opposed to just strings) through the broker:

publish :my_queue, :action => 'do_stuff', :with => 'User', :id => 1

But remember to keep it simple. Don't try to stuff large objects through there. Sending over the user itself is very likely not a good idea, even more so when it's an ActiveRecord object.

More to come on a13g and Stomp.