Delayed Published Posts in KeystoneJS

January 31, 2016    node nodejs cms tutorial javascript js keystone keystonejs worker

It goes without saying that having the ability to schedule posts is somewhat of a must-have functionality in a CMS - at least that's what I've noticed or heard.

Before I begin, note that this is a poor man's implementation of a post scheduler. In my opinion, it is not scalable and should not be used if you have 50+ people scheduling posts at any given time. This way of doing things does have the benefit of not having to poll the system every minute or so depending on how accurate you would like your workers to be. Time is always a b!@**! Additionally, I went with this method because I'm limited by the machine's resources I'm working on - a t2.medium instance on AWS. Boo!

We are going to be using Kue to implement this feature. Kue, for those new to it, is a Redis-backed job queue for NodeJS. Setting up a new job and processing it is easy, and it even comes with a JSON API and a small app, built on Express, to manage your queue.

So, how do we build a scheduler using a job queue? Kue, has the ability to delay jobs. See where the poor-man's implementation comes in. Delayed jobs are polled by Kue itself so we don't need to think about setting up a separate worker to poll the system.

To install kue npm install kue --save

Create a new file in your root folder of your Keystone project and name it jobs.js.

In jobs.js, we first require kue as our dependency and instantiate a queue.

var kue = require('kue');
var queue = kue.createQueue();

By default, Kue will try to connect to the default redis-server running on localhost:6379. If you want to customise the redis settings, you can check out the docs.

Next, we need to require keystone to access the Post model and carry out our job, and moment to calculate the delay amount for scheduled posts.

var moment = require('moment');
var keystone = require('keystone');

Now, to define the main job. This function is the exported function that our model will call every time a post is scheduled, passing in the mongo document as its only parameter.

var schedulePost = function(doc) {
  var delay = moment(doc.schedulePost).diff(moment());

  var job = queue.create('post', {
    title: 'Post scheduling',
    doc: doc     
}).delay(delay).priority('high').save(function (err) {
    if (err) console.log(err);
    if (!err) console.log('Saved new job with id: ', job.id);
  });

  job.on('complete', function(result) {
    console.log('Job completed ', result);
  }).on('failed', function(errorMessage) {
    console.log(errorMessage);
  });
};

All we do in this function is calculate the delay using the .diff() function in moment, create a new job, passing in the mongo document to the job along with a title. We set the priority of this job to high and finally call .save() on the newly created job. Kue also has events that are fired on a given job instance. For now, we'll only be using the complete and failed events.

Once, a job is created and added to the queue, we need to process it by calling on queue.process.

queue.process('post', function(job, done) {
    makePost(job.data.doc, done);
});

The first argument is the name of the job we want to process - which for us is post. The callback function accepts a job object and a done callback. We pass in the mongo document and callback to our makePost function and let it handle the rest.

The last part of the puzzle - the makePost function is what carries out the actual processing of the document that was provided to us. On a basic level, all we need to do is essentially assign a publishedAt date and change its state from draft to published for a given post.

function makePost(doc, cb) {
 keystone.list('Post').model.findById(doc.id).exec(function (err, item) {
         if (err) return callback(err);

         item.state = 'published';
         item.pickedUp = true;
         item.publishedAt = new Date();

         item.save(function (err) {
             if (err) return callback(err);
             callback();
         });
    });
}

We query the model with the id of the document. If no errors turn up, we assign a new Date() and change the state of the post to published before finally saving it. One things you'll notice is that I placed an extra boolean variable called pickedUp. This is to make sure that the worker does not pick up the same document more than once should you have multiple workers. And this job is being called from the Post model in a post save hook. Not checking for this boolean will lead to a infinite loop because the hook and hence the job will be called every time the document is saved.

Our final jobs.js now looks like this

var kue = require('kue');
var moment = require('moment');
var keystone = require('keystone');
var queue = kue.createQueue();

var schedulePost = function(doc) {
  var delay = moment(doc.schedulePost).diff(moment());

  var job = queue.create('post', {
    title: 'Post scheduling',
    doc: doc     
}).delay(delay).priority('high').save(function (err) {
    if (err) console.log(err);
    if (!err) console.log('Saved new job with id: ', job.id);
  });

  job.on('complete', function(result) {
    console.log('Job completed ', result);
  }).on('failed', function(errorMessage) {
    console.log(errorMessage);
  });
};

queue.process('post', function(job, done) {
    makePost(job.data.doc, done);
});

function makePost(doc, cb) {  keystone.list('Post').model.findById(doc.id).exec(function (err, item) {
         if (err) return callback(err);

         item.state = 'published';
         item.pickedUp = true;
         item.publishedAt = new Date();

         item.save(function (err) {
             if (err) return callback(err);
             callback();
         });
    });
}

module.exports = schedulePost;

To finish off this feature, we'll queue a job in the post save hook of our mongoose schema.

Post.schema.post('save', function (doc) {
  var jobs = require('../jobs.js');

  if (doc && doc.schedulePost && !doc.pickedUp) {
    jobs.schedulePost(doc);
  }
});


comments powered by Disqus