We’re here to work with you at all stages.

View all services
Build products using the latest engineering practices and designs that aren’t just functional but beautiful with Launch.
Learn more
Rethink how your product delivery teams build and design your products. Architect to build the building blocks that allow experimentation with Amplify.
Learn more
Gain market share by designing and building product features. Gain velocity by embedding our experts in your team with Catalyse.
Learn more
Take control of your cloud costs and technical debt, and add coverage for DevOps with Control.
Learn more

Articles

From user research, digital strategy to solving bold engineering problems. Our team specialises in providing a suite of services that take an idea from a rough sketch to an enterprise grade product.
View all articles

Tutorials

Learning new technologies and frameworks ensures we are ahead of the curve. Here is a collection of step by step tutorials about things we've learnt. Learn with us!
View all tutorials

Products

We love open source, and we love giving back. Take a look at our open source products and how we're pushing the bounds of Engineering excellence one product at a time
View all products

Culture

We believe the best digital products are built by a diverse and skilled team. We’ve created a safe inclusive workspace, and we believe in diversity. We are a group that believes in software development and design is a craft. This is what unites us.
Learn more

Mission, Vision & Purpose

Our team is diverse. Each coming from a different background and beliefs. We think of product development & design as a craft. We love to learn new ways of improving our craft - be it learning new frameworks, or adding new specialties.
Learn more

White Papers

We believe the best digital products are built by a diverse and skilled team. We’ve created a safe inclusive workspace, and we believe in diversity. We are a group that believes in software development and design is a craft. This is what unites us.
Learn more

Wednesday Wisdom

Our team is diverse. Each coming from a different background and beliefs. We think of product development & design as a craft. We love to learn new ways of improving our craft - be it learning new frameworks, or adding new specialties.
Learn more

White Papers

We believe the best digital products are built by a diverse and skilled team. We’ve created a safe inclusive workspace, and we believe in diversity. We are a group that believes in software development and design is a craft. This is what unites us.
Learn more

Wednesday Wisdom

Our team is diverse. Each coming from a different background and beliefs. We think of product development & design as a craft. We love to learn new ways of improving our craft - be it learning new frameworks, or adding new specialties.
Learn more
View all tutorials
[Part 3] Executing batch jobs in a multi-container environment using NodeJS and express.
August 4, 2021
Mohammed Ali Chherawalla
Software Engineer
Contents

This is the last tutorial in a 3-Part series. If you haven't completed the first 2 tutorials I would recommend going through them first herehere.

Add support for subscriptions

We will use RedisPubSub from graphql-redis-subscriptions to publish to subscription topics. We will publish a message when the user invokes the scheduleJob mutation. A new CRON will be registered that will execute every minute. This is done so that "an automated" message is published to the notifications topic every minute.

Let's start by installing all the dependencies.

Step 1

Install the necessary packages


yarn add graphql-subscriptions graphql-redis-subscriptions \
ioredis cors whatwg-fetch apollo-server-express \
http subscriptions-transport-ws@0.9.17

These are the required packages

  • graphql-subscriptions
  • graphql-redis-subscriptions
  • ioredis
  • cors
  • whatwg-fetch
  • apollo-server-express
  • http
  • subscriptions-transport-ws
Step 2

Create a new file


touch server/utils/pubsub.js

Copy the snippet below into the pubsub.js


import { RedisPubSub } from 'graphql-redis-subscriptions';
import Redis from 'ioredis';

const options = {
  host: process.env.REDIS_DOMAIN,
  port: process.env.REDIS_PORT,
  connectTimeout: 10000,
  retryStrategy: times =>
    // reconnect after
    Math.min(times * 50, 2000)
};

export const pubsub = new RedisPubSub({
  publisher: new Redis(options),
  subscriber: new Redis(options)
});

Step 3

Create a subscription topic.

Add the following snippet in the utils/constants.js file


export const SUBSCRIPTION_TOPICS = {
  NOTIFICATIONS: 'notifications'
};

Create the subscription file


touch server/gql/subscriptions.js

Copy the following snippet


import { GraphQLNonNull, GraphQLObjectType, GraphQLString, GraphQLInt } from 'graphql';
import { pubsub } from '@utils/pubsub';
import { SUBSCRIPTION_TOPICS } from '@utils/constants';
export const SubscriptionRoot = new GraphQLObjectType({
  name: 'Subscription',
  fields: {
    notifications: {
      type: new GraphQLObjectType({
        name: 'ScheduleJobSubscription',
        fields: () => ({
          message: {
            type: GraphQLNonNull(GraphQLString)
          },
          scheduleIn: {
            type: GraphQLNonNull(GraphQLInt)
          }
        })
      }),
      subscribe: (_, args) => pubsub.asyncIterator(SUBSCRIPTION_TOPICS.NOTIFICATIONS)
    }
  }
});

Make the following changes in the server/index.js


import cors from 'cors';
import { SubscriptionServer } from 'subscriptions-transport-ws/dist/server';
import { GraphQLSchema, execute, subscribe } from 'graphql';
import 'whatwg-fetch';
import { ApolloServer } from 'apollo-server-express';
import { createServer } from 'http';
import { SubscriptionRoot } from '@gql/subscriptions';
...
export const init = async () => {
  ...
  const schema = new GraphQLSchema({ query: QueryRoot, mutation: MutationRoot, subscription: SubscriptionRoot });
  ...
  app.use(rTracer.expressMiddleware());
  app.use(cors()); // 
  ...
  if (!isTestEnv()) {
    const httpServer = createServer(app);
    const server = new ApolloServer({
      schema
    });
    await server.start();
    server.applyMiddleware({ app });
		// 2
    const subscriptionServer = SubscriptionServer.create(
      { schema, execute, subscribe },
      { server: httpServer, path: server.graphqlPath }
    );
    ['SIGINT', 'SIGTERM'].forEach(signal => {
      process.on(signal, () => subscriptionServer.close());
    });
    httpServer.listen(9000, () => {
      console.log(`Server is now running on http://localhost:9000/graphql`);
    });
    initQueues();
  }

  1. Handle CORS error thrown by studio.apollographql
  2. Create a subscription server that will expose a websocket on the same pathname as the mutations and queries.

To test your subscriptions go to https://studio.apollographql.com/sandbox/explorer. Add http://localhost:9000/graphql in the top left URL bar.

Click documentation tab on the top left pane header and filter by subscription → notifications and you will see the newly added subscription.

Step 4

Copy the snippet below in the server/utils/queues.js



import { pubsub } from '@utils/pubsub';
import { SUBSCRIPTION_TOPICS } from '@utils/constants';
...

...
const CRON_EXPRESSIONS = {
  MIDNIGHT: '0 0 * * *',
  EVERY_MINUTE: '* * * * *'
};

export const QUEUE_NAMES = {
  ...,
  EVERY_MINUTE_CRON: 'everyMinuteCron'
};

export const QUEUE_PROCESSORS = {
  ...,
  [QUEUE_NAMES.EVERY_MINUTE_CRON]: (job, done) => {
    console.log(`publishing to ${SUBSCRIPTION_TOPICS.NOTIFICATIONS}`);
    pubsub.publish(SUBSCRIPTION_TOPICS.NOTIFICATIONS, {
      notifications: {
        message: 'This message is from the CRON',
        scheduleIn: 0
      }
    });
    done();
  }
};
export const initQueues = () => {
  console.log(' init queues');
  ...
  queues[QUEUE_NAMES.EVERY_MINUTE_CRON].add({}, { repeat: { cron: CRON_EXPRESSIONS.EVERY_MINUTE } });
};

This will add support to publish to the newly created notifications topic when the scheduleJob mutation is invoked.

Copy the snippet below in the server/gql/custom/scheduleJobMutation.js


import { pubsub } from '@utils/pubsub';
import { SUBSCRIPTION_TOPICS } from '@utils/constants';
...

  async resolve(source, args, context, info) {
    ...
    ...
    .then(job => {
        console.log(`${moment()}::Job with id: ${job.id} scheduled in ${args.scheduleIn} milliseconds`);
        pubsub.publish(SUBSCRIPTION_TOPICS.NOTIFICATIONS, {
            notifications: args
        })
        return { success: true };
      })
      ...
  },

This will create a CRON that runs every minutes and publishes a message to the notifications topic.

Step 5

Time to test out your subscription! Go to https://studio.apollographql.com/sandbox/explorer. Paste the snippet below in the left top pane


subscription Notifications {
  notifications {
    message
    scheduleIn
  }
}



Hit the Play button and you will see a subscription tab pop-up in the bottom right

Paste the snippet below in the left pane


mutation ScheduleJob($scheduleJobScheduleIn: Int!, $scheduleJobMessage: String!) {
  scheduleJob(scheduleIn: $scheduleJobScheduleIn, message: $scheduleJobMessage) {
    success
  }
}


Paste the snippet below in the variables pane


{
  "scheduleJobScheduleIn": 100,
  "scheduleJobMessage": "Scheduled job message"
}

Select ScheduleJob and hit the play button

Very soon you'll see another message come up in the subscriptions tab because of the EVERY_MINUTE CRON

Commit your code using the following git commands


git add .
git commit -m 'Add support for graphql redis subcriptions!'

Where to go from here

You can find the complete code here: https://github.com/wednesday-solutions/node-express-batch-jobs

I would recommend going through the articles below

If this series piqued your interest please stay tuned for the next tutorial in which we will write a CD pipeline to deploy this application using ECS.

I hope you enjoyed reading this series on how to create container-aware CRONS, scheduled jobs, and GraphQL Subscriptions. If you have any questions or comments, please join the forum discussion on Twitter.

Wednesday is a boutique consultancy based in India & Singapore.

Let's talk

Wednesday is a boutique consultancy based in India & Singapore.

Let’s talk

2023