Messaging Queues with Remix

23rd January 2022

I'm currently working on Docbot, which writes your documentation for you. This involves a lot of time-consuming work which we want to run asynchronously so you don't need to make a cup of tea each time you make a change to your code. It's important that we get this right because it's so critical to the UX.

This isn't a tutorial! If you're interested in getting the complete code get in touch at jamie@docbot.dev

Remix has an excellent DX and lets us build quickly without sacrificing correctness or performance. However, messaging queues are outside of its scope so we need to build that part ourselves. I think we've done a good job and I'd like to share it with you 👉

The tech stack consists of:

  • Redis
  • BullMQ - A robust queue built on top of Redis
  • Express
  • Fly - Edge-first hosting provider

API

Our queues are defined within /app/workers. Each Typescript file contains a single queue and it's corresponding worker (the thing that processes our jobs). They can use ~ imports to reference other code within /app, allowing us to share code between the Remix app and the workers.

// app/workers/company.server.ts

import { Queue, Worker } from "bullmq";
import { bullMqOptions } from "~/bullmq.server";
import { db } from "~/utils/db.server";

type Data = {
  companyId: string;
};

export const companyQueue = new Queue<Data>("company", bullMqOptions);

const worker = new Worker(
  companyQueue.name,
  async (job) => {
      const company = await db.company.findUnique({
          where: {
              id: job.data.companyId,
          }
      });
      await doWork(company);
  },
  bullMqOptions
);

worker.on("completed", (job) => {
  console.info(`${job.id} completed`);
});

Workers are enabled by re-exporting them in /server/workers.ts.

// server/worker.ts

export * from "~/workers/company.server";
export * from "~/workers/project.server";

Now we can push jobs to the queue from other parts of the app, for example within a Remix action:

// app/routes/company.tsx

import { companyQueue } from "~/workers/company.server";

export let action: ActionFunction = async () => {
    // [...]
    await companyQueue.add("sync", {
        companyId: 1243,
    });
    // [...]
}

Server

Remix isn't a server, it's a request handler that you give to your actual server. In our case a custom Express server, /server/index.ts, that sets up the request handler and starts the workers. The Remix docs have a good example of this.

Local environment

For local development we have a docker-compose.yml containing the services needed to run the app. This includes Postgres and Redis.

# docker-compose.yml

version: "3"
services:
  postgres:
    image: postgres:latest
    ports:
      # Accessible from the host at port :35433
      - "35433:5432"
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: docbot
    # Make sure log colours show up correctly
    tty: true
  redis:
    image: redis:latest
    ports:
      - "6379:6379"
    tty: true

We use PM2 to manage multiple processes and watch for file changes. One of these processes is our server, which is started with Node and ESBuild. tsconfig-paths is required for our absolute imports to work.

// pm2.config.js

module.exports = {
  apps: [
    // [...]
    {
      name: "Server",
      script:
        "node -r ./mocks -r dotenv/config -r tsconfig-paths/register -r esbuild-register server",
      watch: [
        "./app/workers/**/*.ts",
        "./server/**/*.ts",
        "./.env",
      ],
    },
  ],
};

CI

Github Actions makes it really simple to run Redis for testing. Just define a service container and provide the credentials using env.

# .github/workflows/ci.yml

services:
    redis:
        image: redis
        options: >-
            --health-cmd "redis-cli ping"
            --health-interval 10s
            --health-timeout 5s
            --health-retries 5

env:
    REDIS_HOST: redis

Introducing messaging queues to our app also has the risk of introducing flakiness to our tests. We're using Cypress for everything and we've written a useful command that waits until all jobs have finished executing.

// cypress/integration/sync.ts

it("Syncs the repositories", () => {
    cy.visit("...");

    // No arbiitrary waits here, execution only moves on once all jobs finish
    cy.waitForJobsToComplete();
    cy.reload();

    cy.findByText("backend");
    cy.findByText("frontend");
});

Production environment

In production a custom build script generates our server with the workers bundled up.

// scripts/build.js

// [...]

require("esbuild")
  .build({
    entryPoints: ["./server/index.ts"],
    bundle: true,
    outdir: "./server-build",
    target: [`node17`],
    platform: "node",
    format: "cjs",
    logLevel: "info",
    external,
  })
  .catch((error) => {
    console.error(error);
    process.exit(1);
  });

We deploy the app to Fly using Docker alongside our Redis deployment. We connect using Fly's internal networking which uses IPv6, our BullMQ configuration reflects this.

// app/bullmq.server.ts

export const bullMqOptions = {
  connection: {
    host: process.env.REDIS_HOST ?? "localhost",
    password: process.env.REDIS_PASSWORD ?? "",
    port: 6379,
    family: isProduction ? 6 : 4,
  },
};

Conclusion

That's our setup at Docbot. Our stack also includes Tailwind, MSW, Cypress, Prisma, ESLint & Husky. We find it insanely productive and if you'd like to see the code reach out to me at jamie@docbot.dev.