Roll your own Queue Processor in Elixir with GenStage

ELIXIR GENSTAGE POSTGRESQL QUEUE

Roll your own Queue Processor

Surely, Michael, you can’t be serious?

I know you love to roll your own …

But why would you want to roll your own Queue Processor in this day and age when Oban has become the de facto in Elixir applications?

Well let me tell you a story about how I accidentally built QueueLixir and incorporated it into Cassava long before the latter was conceived.

Along the way, I hope you will see my journey as yours and reclaim the freedom to enjoy writing code again without permission or purpose.

You never know what surprises lie in wait.

All Ends have Beginnings

I got introduced to Elixir by a coworker who could not stop raving about this fantastic language built around lightweight processes.

“It sounds interesting” was all my .NET developer identity would allow me to embrace at the time.

A few years passed, and I became increasingly bored with object-oriented programming. I was finally ready to dip my toes into the deep end of the pool that Functional Programming represented in my mind.

Enter “Seven Languages in Seven Weeks.”

Wow, so much candy and so little time. Which should I feast on first?

I eventually settled on Erlang after first kicking the tires of Haskell and quickly sinking to the bottom of the pool, as you can imagine.

But, as with everything in life, an ember was lit, which would one day set fire to the old way of thinking to make room for the new.

More time went by before the itch to try something functional took hold.

“Could I really build an application without holding state?”

Eventually, I got my hands on “Programming Elixir” by Dave Thomas and began my journey to the light.

The question of “What to build” now confronted me.

Enter GenStage and Back Pressure

GenStage was released when I was busily retraining my mind that the equal (=) sign did not just mean assignment but that the left-hand and right-hand sides are the same.

With GenStage, I could see how all these processes and their life cycles came into play which help to further deepened my understanding of Elixir.

“You know what would be a good exercise of all this power?”

A Queue Manager, just like the RubyGem Queue Classic, which I thoroughly enjoy for its simplicity and direct use of Postgresql.

And so began the march to build what would become known as QueueLixir, without any rhyme or reason beyond a mere desire to see how far I could stretch my understanding of Elixir.

QueueLixir is born

QueueLixir Sketch

My modest Queue processor began life as a standalone library built around Postgresql’s Listen/Notify message processing, just like its big brother Queue Classic.

I wanted to keep QueueLixir slimmed which meant using Mobeius as a Postgresql wrapper. It would take a major update before Ecto made an appearance but that is getting ahead of the story.

QueueLixir began with a modest interface based on three types of logical queues:

  • Immediate
  • Delayed
  • Recurrent
def enqueue(worker, payload, opts)

def enqueue_at(%DateTime{} = datetime, worker, payload, opts)

def enqueue_in({unit, offset}, worker, payload, opts)

def enqueue_every(terms, worker, payload, opts)

With these functions in hand, I added a QL.RepoWrapper module that could handle persisting and furnishing work.

All that was left to do was to produce and consume the work. I say all, but that is not to belittle the effort required.

The most intriguing part of working with GenStage was rolling my own GenStage Dispatcher. Here we go again…

GenStage comes with three Dispatchers out of the box:

  • DemandDispatcher (sends batches to the highest demand)

  • BroadcastDispatcher (accumulates demand from all consumers before broadcasting events to all of them)

  • PartitionDispatcher (sends events according to partitions)

None of these fit the bill for what I had in mind. Truth be told, I didn’t have anything concrete at this stage. I wanted the Dispatcher to manage the demand of different types of consumers, as seen below.

%{
    type1: %{
      ref1: %{pid: pid, max: max, capacity: capacity},
      ref2: %{pid: pid, max: max, capacity: capacity}
    },
    type2: %{
      ref1: %{pid: pid, max: max, capacity: capacity},
      ref2: %{pid: pid, max: max, capacity: capacity}},
      ref3: %{pid: pid, max: max, capacity: capacity}
    },...
}

The dispatcher is configured by the `:producer` on init as follows:

  {:producer, opts, dispatcher: QL.ScopedDemandDispatcher}

Luckily, the library comes with a GenStage.Dispatcher Behaviour for authoring your own Dispatcher, and this is precisely what I did. With this configuration, I could now have a single Dispatcher manage many types of consumers and deliver work to the one with the highest capacity scoped by type.

The implementation of the subscribe action is shown below. We lookup the subscriber by type and inject it appropriately into the state.

@doc """
Subscribe a consumer of a given type.

The :`max_demand` should be set during this phase,
otherwise it defaults to the size of the first demand.
"""

def subscribe(_opts, {pid, ref} = _from, state) do
    %GenStage{mod: mod, state: _state, type: :consumer} = :sys.get_state(pid)

    {_old_state, new_state} =
      state
      |> Map.get_and_update(mod, fn subscribers ->
        {subscribers,
         Map.put(
           subscribers || %{},
           ref,
           pid: pid,
           max: state[:max_demand],
           capacity: nil
         )}
      end)

    {:ok, 0, new_state}
end

The overall workflow was one in which the Producer received a notification from Postgresql when new work was added to the system. Immediate work was, well, processed immediately! Delayed and Recurrent work required checking the current wait to see if the new job should be processed ahead of the current one. Once the work was identified, it was dispatched to the consumers.

To this end, I created a QL.GenStage macro that all consumers conform to with a single function.

@callback handle_event(event :: Event.t(), state :: term) ::
          :ok
          | {:ok, value :: term}
          | :retry
          | {:rate_limited, interval :: integer}
          | {:error, reason :: bitstring}

This allows each Consumer to focus on the job at hand, leaving matters such as locking and unlocking work to the capable hands of the Producer. A consumer can indicate whether a job should be retried at a future time because of a transient issue or whether work should be halted for a rate limit.

Below is a sample configuration of how consumers can be configured.

config :cassava_runtime, :consumers, [
  {CassavaRuntime.RoundDistributionConsumer, [], 10},
  {CassavaRuntime.RoundPreviewConsumer, [], 3},
  {CassavaRuntime.ContentQueryConsumer, [], 5},
  {CassavaRuntime.AssessmentConsumer, [], 5},
  {CassavaRuntime.RefreshStatsConsumer, [], 2},
  {CassavaRuntime.GenerateDistributionSlotsConsumer, [], 5}
]

Synchronicity

Fate has a rather peculiar way of showing up in life.

At some point, I placed QueueLixir on the shelf and took my attention elsewhere.

A year or so later, the idea of Cassava came to mind. One day, when I was deep in the weeds trying to figure out how to schedule content for audience members, I thought of a queue.

But not just any queue!

It was QueueLixir’s time to shine. You see, the little engine written without a purpose was poised to fit ever so neatly into the fold - like it was meant to be from the start.

Synchronicity is the name given to the phenomenon of managed coincidences.

Everything implemented, including a scope Dispatcher, was now paying dividends, including the idea for a rate limiter that could be applied independently to channels (e.g., Slack vs Microsoft Teams) and organizations within each channel.

Today, Cassava runs QueueLixir in two modes. A shared processor is used for all tenants on lower subscription plans and dedicated processors for those on the premium plan. These processors are booted on startup as well as on the fly.

None of this reliance on QueueLixir was planned in hindsight. Had it been, I would have reached for a solution off someone else’s shelf. But fortune shined on one I already had in place and could reason about. All that was left was to push all the chips in on me.

If you advance confidently in the direction of your dreams, daring to live the life that you have imagined, you will meet with success unexpected in common hours. -- Henry David Thoreaua

Summary

Here, we are at the end of my story on how an unintentional decision led me down the path of rolling my own modest Queue Processor, which now profoundly serves my needs. Can it handle a million records without bogging down? Probably not. Can it handle the ones I have now? Yep.

In the end, the question is not whether you should roll your own, but rather, should you listen to that little voice inside seeking expression and trust it, regardless of what the outside world has to say is the common sense thing to do. And remember, common sense does not equate to common practice, to steal a quote from Brendon Burchard.

I plan to tease out more learnings from this adventure in the future. If you want to see more of the code, I may publish QueueLixir on Github and Hexdoc as is.

Your feedback is welcomed as always.

Michael


You no doubt have an opinion bubbling to the surface.
Let's go one step farther and add your voice to the conversation.
Your email is used to display your Gravatar and is never disclosed. As always, do review our moderation guidelines to keep the converstion friendly and respectful.