Building a Job Queue for a scalable application architecture

Building a Job Queue for a scalable application architecture

How to build a Job Queue for queuing and running all your related tasks in isolation, to decouple and scale microservices, distributed systems, and serverless applications.

Before jumping in straight to building the job queue, first let's try and understand what we are building and why?

In this age of modern software development, almost every piece of software application that we build to facilitate our business, is made in a distributed architecture where different parts of the system interact with each other via some form of communication protocol, and as a whole, define the entirety of the business and the application. One such form of ensuring communication is Message-oriented middleware aka MOM that takes care of sending and receiving messages in a distributed system setup and ensures that our distributed system feels like as a single operational unit from the outside. If you have already worked with such a setup using many of the popular readymade cloud solutions ranging from Amazon SQS to Google Cloud Pub/Sub etc, you are well aware of the context. And if you haven't had a chance or simply don't get the point of using such a setup, I have a small example to make things more clear.

Let's analyze one part of an an e-commerce platform and see what happens when a customer makes a purchase. Assuming that the payment and everything else worked seamlessly and the purchase is through to the stage where the system creates a new order. At this point, there might be a few other essential tasks that the system needs to perform in addition of creating and storing an order in the database, eg:

  • send confirmation email to the customer
  • initiate order processing
  • perform some other bookkeeping / synchronization tasks

The above list is not exhaustive but just an indication of the possibilities. Now on a surface level, these tasks might look simple enough to be completed alongside order creation process. However, depending upon the volume of requests that your e-commerce platform receives, and the complexity involved in completing each of these tasks, it might take a considerable amount of time to finish all these tasks for a single order. For example in the confirmation email, you might want to include some recommendations related to the order for upselling other products, some pdf attachments related to the order, etc. And for initiating the order processing, it might mean that you are either communicating with your merchants to start processing the order or initiating the process in your own inventory system. The exact steps or list of things to do would vary, but in all cases it would be a significant amount of sub-tasks that the system would need to perform in order to complete these tasks.

Having a Message-oriented architecture can ensure greater scalability and decoupling of your application, with a faster response time and availability.

In a normal request-response style setup, your system may do all these tasks after creating the order, and then finally return the response. But from what we described above, this could mean that we are unnecessarily keeping the clients waiting before we can even send a successful response. And furthermore, what happens if the order was created successfully in the database but some of the other tasks failed, what do we do in that case, should we send an error response? How do we make sure that we can retry only specific tasks that failed? How do we ensure that we can have an admin dashboard for our Customer Care / OPs agents to manually retry these tasks? This is where a job queue or any MOMs could be of significant help and a value-add. So instead of performing all the tasks at once, we would perform only the minimal set of operations and for the rest, create a task each aka queue those jobs.

  • create a new order in the database with the required details and for each newly created order,
  • create a task for confirmation email
  • create a task for initiating order processing
// example order creation function
async function createOrder(orderDetails) {
  // first up create the order
  const order = await ordersService.mutations.create(orderDetails)

  // now create all tasks needed
  await tasksService.mutations.create([{
    // indicate which service is responsible to handle this task
    type: 'email',
    // indicate which action needs to be performed by the service
    action: 'orderConfirmation',
    // this is the initial state of the task
    // a task could either be `scheduled`, `completed` or `failed`
    // this will allow us to have know what pending tasks are there
    // for a given order and also to know which tasks are `completed`
    // and which one of them `failed`, thus letting us build a
    // simple admin interface around order task management
    state: tasksService.const.TASK_STATE.scheduled,
    // add any additional payload that are needed for a
    // successful completion of the task in isolation
    data: {orderId: order.uid}
  }, {
    type: 'inventory',
    action: 'mutations.initiateOrderProcessing',
    state: tasksService.const.TASK_STATE.scheduled,
    data: {orderId: order.uid}
  }, {
    //... add more tasks as needed
  }])

  return order
}
pseudo-code for creating an order in an e-commerce platform
// sample task processing function
async function processTaskOnCreate(task) {
  const {type, action, state} = task

  // find the service handler that would process the newly created task.
  // the `services` may exist in a distributed setup, as a microservice,
  // as a serverless application, or whatever architecture, it doesn't matter.
  // as long as they can be invoked, that's all we care about
  const taskHandler = _.get(services, `${type}.${action}`)

  try {
    // process the task
    await taskHandler(task)
    // mark task as completed
    await tasksService.mutations.update({state: TASK_STATE.completed})
  } catch (err) {
    // mark task as failed so that it can later be retried
    await tasksService.mutations.update({state: TASK_STATE.failed})
  }
}

// add a handler to process all newly created tasks
// this might be a database trigger of some sort
// and should be triggered whenever a task is created
tasksService.onCreate(processTaskOnCreate)
pseudo-code for processing the tasks aka job queue

With the above setup in place, we now have a job queue that processes the tasks in isolation. As long as we ensure that we deploy the task service as a separate entity, preferably in a serverless environment, we can be sure that our application will scale infinitely without us having to make any modifications to our setup or application code. And as our business needs evolve, we could keep on adding new tasks to our order and write the corresponding services, all in isolation, while the setup would take care of scalability and bringing the system together.

Bonus

If you are using Firebase and Google Cloud Platform for building your application, I have some ready-to-use sample code that you can use to get started. Assuming that you have an orders collection and a tasks sub-collection for each of the order, you can do the following to create a job queue for processing all your tasks related to each of your order.

Note: The use of orders collection is just for the example, you can have any collection with similar needs and this solution would work just fine.

  1. Create a file named processTaskOnWrite.js for processing the job queue.
const _ = require('lodash')

// ideally you'd define all these handlers in a separate module
const handlers = {
  order: {
    sendConfirmationEmail: () => {
      // invoke email service
    },
    initiateProcessing: () => {
      // invoke order processing service
    }
  }
}

const TASK_STATE = {
  scheduled: 'scheduled',
  completed: 'completed',
  failed: 'failed',
}

const isFailedOrCompleted = state =>
  state === TASK_STATE.completed || state === TASK_STATE.failed

/**
 * This is a generic task processor that can be attached to any Cloud Firestore
 * document having a `tasks/{taskId}` or similar sub collection.
 * We can push the task object {type, action, state, data}, to the sub collection
 * and attach this method to `onWrite` trigger and let the handler take care
 * of the invoking the appropriate pre-defined handlers.
 *
 * @example
 * // attach firebase trigger handler for order related task queue processing
 * exports.processOrderTasksQueue = functions.firestore
 *   .document('orders/{orderId}/tasks/{taskId}')
 *   .onWrite(require('./processOnWrite'))
 *
 * // later you can write a task to the sub collection
 * // which will invoke the `order.sendConfirmationEmail` handler,
 * // if defined in the list of task handlers
 * dbUtils.createDocument('orders/xxxx/tasks', {
 *  type: 'order',
 *  action: 'sendConfirmationEmail',
 *  state: 'scheduled',
 *  data: {} // any additional data that you like to pass
 * })
 *
 * @param {Object} change Firebase cloud trigger `change` object
 * @param {Object} context Firebase cloud trigger `context` object
 */
async function processTaskOnWrite(change, context) {
  const isTaskDeleted = !_.get(change, 'after.exists')

  if (isTaskDeleted) {
    const taskId = _.get(change, 'before.id')
    const deletedTask = _.invoke(change, 'before.data') || {}
    const {type, action, state} = deletedTask
    console.log(
      'skip processing deleted task',
      `taskId: ${taskId}, type: ${type}, action: ${action}, state: ${state}`
    )
    return null
  }

  const taskId = _.get(change, 'after.id')
  const task = _.invoke(change, 'after.data') || {}
  const {type, action, state} = task
  const taskHandler = _.get(handlers, `${type}.${action}`)
  const taskDetailsString = `taskId: ${taskId}, type: ${type}, action: ${action}, state: ${state}`

  if (!taskHandler) {
    console.log('skip processing task, no handler exist', taskDetailsString)
    return null
  }

  if (isFailedOrCompleted(state)) {
    console.log('skip processing failed or completed task', taskDetailsString)
    return null
  }

  try {
    console.log('start processing task', taskDetailsString)
    await taskHandler(context, task)
    console.log('end processing task', taskDetailsString)

    console.log('start updating completed state', taskDetailsString)
    await change.after.ref.update({state: TASK_STATE.completed})
    console.log('end updating completed state', taskDetailsString)
  } catch (err) {
    console.error('error processing task', taskDetailsString, err)

    console.log('start updating failed state', taskDetailsString)
    await change.after.ref.update({state: TASK_STATE.failed})
    console.log('end updating failed state', taskDetailsString)
  }
}

module.exports = processTaskOnWrite
processTaskOnWrite.js file in the functions folder

2.   Create the appropriate Cloud Firestore trigger by adding the below code to your functions index.js file.

const functions = require('firebase-functions')
const processTaskOnWrite = require('./processOnWrite')

module.exports = functions.firestore
  .document('orders/{orderId}/tasks/{taskId}')
  .onWrite(processTaskOnWrite)
index.js file inside your functions folder

That's it! Now you have a job queue for processing any task related to your order.

You can add more handlers and attach the same generic processTaskOnWrite to all documents for which you need such a job queue. You can even build a simple UI to show the list of tasks for an order with the task state and let the admins toggle the state from failed to scheduled to retry a failed task. The possibilities are endless, and it purely depends on your application needs and business use case. However the important takeaway is the concept that can be applied to various different scenarios.


If you happen to run into issues or spot any mistakes with this article, please feel free to comment and I'd try my best to help you out / correct the mistakes.

Happy Coding!