ReefPointsBlog

Rails 4.0 Sneak Peek: Queueing

Brian Cardarella

Recently a queueing system was added to Rails. Let's dive in and see how to use it.

Run, baby, run!

The queueing API is very simple. You push an object on to the queue and that object is expected to respond to a run method. Let's take a look:

1
2
3
4
5
6
7
8
class TestJob
  def run
    puts "I am running!"
  end
end

Rails.queue.push(TestJob.new)
=> "I am running!"

For most people, that is pretty much it. The queue is running in a separate thread from the app thread, so your app shouldn't notice any response impact from an expensive job.

The basic queue that comes with Rails is not a long-term solution. The goal here is to establish a common API that more robust queueing systems can plug themselves into. In most cases you shouldn't need to change any of your app code if you want to switch from Resque to Sidekiq. You should take care that the objects you are enqueing can be properly marshalled.

You can even write your own queue, let's take a look at the API of a custom queue

1
2
3
4
5
class MyQueue
  def push(job)
    job.run
  end
end

Then in your application.rb

1
config.queue = MyQueue

This example is straight from the Rails test suite. This will define a queue that does not run jobs asynchronously. As soon as the job is pushed onto the queue it is run. Let's make an actual queue (without relying on the Queue class)

1
2
3
4
5
6
7
8
9
10
11
12
13
class MyQueue
  def initialize
    @queue = []
  end

  def push(job)
    @queue.push(job)
  end

  def pop
    @queue.pop
  end
end

In this example we have implemented a simple queue. You will next need to tell Rails's QueueConsumer to use this queue. You can do this in application.rb with an initializer block:

1
2
3
4
intializer 'start queue consumer' do |app|
  app.queue_consumer = config.queue_consumer.start(app.queue)
  at_exit { app.queue.consumer.shutdown }
end

and if we now push to our new queue:

1
Rails.queue.push(TestJob.new)

...we get nothing. Why? Inspect the QueueConsumer:

1
2
Rails.application.queue_consumer
=> #<Rails::Queueing::ThreadedConsumer @queue=#<MyQueue @queue=[]>, @thread=#<Thread dead>>

So you'll notice that the thread is dead. We can force the queue to process by doing:

1
2
Rails.application.queue_consumer.start
=> "I am running!"

Let's back up to understand what is going on here. First we'll start by looking at ThreadedConsumer#start

1
2
3
4
5
6
7
8
9
10
11
12
def start
  @thread = Thread.new do
    while job = @queue.pop
      begin
        job.run
      rescue Exception => e
        handle_exception e
      end
    end
  end
  self
end

So this thread is only staying alive as long as the @queue.pop returns a truthy value. It's not reasonable or us to keep shoving something into the queue, so let's see what is happening in Queue#pop. For this we'll look at Rubinius' implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Retrieves data from the queue.  If the queue is empty, the calling thread is
# suspended until data is pushed onto the queue.  If +non_block+ is true, the
# thread isn't suspended, and an exception is raised.
#
def pop(non_block=false)
  while true
    @mutex.synchronize do
      @waiting.delete(Thread.current)
      if @que.empty?
        raise ThreadError, "queue empty" if non_block
        @waiting.push Thread.current
        @resource.wait(@mutex)
      else
        retval = @que.shift
        @resource.signal
        return retval
      end
    end
  end
end

This now starts to make sense. Queue#pop is an infinite loop that will wait until it has content before each iteration. Our simple MyQueue class would return nil when ThreadConsumer#start is called because there is nothing in the queue and the thread would die. Even if we put something in queue it would pop once, run the job, try to pop againg, then die.

For the sake of simplicity let's just have MyQueue inherit from Queue

1
2
class MyQueue < Queue
end

Now we can run

1
2
Rails.queue.push(TestJob.new)
=> "I am running!"

The queue system in Rails 4.0 is a very simple solution, I'm looking forward to the release and the support for it to be added to many of the leading background job processing libraries.

Keep in mind that as of this writing the master branch is still versioned as 'beta'. This API could change.