Messaging Queues with Remix
23rd January 2022
I'm currently working on Docbot, which writes your documentation for you. This involves a lot of time-consuming work which we want to run asynchronously so you don't need to make a cup of tea each time you make a change to your code. It's important that we get this right because it's so critical to the UX.
This isn't a tutorial! If you're interested in getting the complete code get in touch at jamie@docbot.dev
Remix has an excellent DX and lets us build quickly without sacrificing correctness or performance. However, messaging queues are outside of its scope so we need to build that part ourselves. I think we've done a good job and I'd like to share it with you 👉
The tech stack consists of:
API
Our queues are defined within /app/workers
. Each Typescript file contains a single queue and it's corresponding worker (the thing that processes our jobs). They can use ~
imports to reference other code within /app
, allowing us to share code between the Remix app and the workers.
// app/workers/company.server.ts
import { Queue, Worker } from "bullmq";
import { bullMqOptions } from "~/bullmq.server";
import { db } from "~/utils/db.server";
type Data = {
companyId: string;
};
export const companyQueue = new Queue<Data>("company", bullMqOptions);
const worker = new Worker(
companyQueue.name,
async (job) => {
const company = await db.company.findUnique({
where: {
id: job.data.companyId,
}
});
await doWork(company);
},
bullMqOptions
);
worker.on("completed", (job) => {
console.info(`✅ ${job.id} completed`);
});
Workers are enabled by re-exporting them in /server/workers.ts
.
// server/worker.ts
export * from "~/workers/company.server";
export * from "~/workers/project.server";
Now we can push jobs to the queue from other parts of the app, for example within a Remix action:
// app/routes/company.tsx
import { companyQueue } from "~/workers/company.server";
export let action: ActionFunction = async () => {
// [...]
await companyQueue.add("sync", {
companyId: 1243,
});
// [...]
}
Server
Remix isn't a server, it's a request handler that you give to your actual server. In our case a custom Express server, /server/index.ts
, that sets up the request handler and starts the workers. The Remix docs have a good example of this.
Local environment
For local development we have a docker-compose.yml
containing the services needed to run the app. This includes Postgres and Redis.
# docker-compose.yml
version: "3"
services:
postgres:
image: postgres:latest
ports:
# Accessible from the host at port :35433
- "35433:5432"
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: docbot
# Make sure log colours show up correctly
tty: true
redis:
image: redis:latest
ports:
- "6379:6379"
tty: true
We use PM2 to manage multiple processes and watch for file changes. One of these processes is our server, which is started with Node and ESBuild. tsconfig-paths
is required for our absolute imports to work.
// pm2.config.js
module.exports = {
apps: [
// [...]
{
name: "Server",
script:
"node -r ./mocks -r dotenv/config -r tsconfig-paths/register -r esbuild-register server",
watch: [
"./app/workers/**/*.ts",
"./server/**/*.ts",
"./.env",
],
},
],
};
CI
Github Actions makes it really simple to run Redis for testing. Just define a service container and provide the credentials using env
.
# .github/workflows/ci.yml
services:
redis:
image: redis
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
env:
REDIS_HOST: redis
Introducing messaging queues to our app also has the risk of introducing flakiness to our tests. We're using Cypress for everything and we've written a useful command that waits until all jobs have finished executing.
// cypress/integration/sync.ts
it("Syncs the repositories", () => {
cy.visit("...");
// No arbiitrary waits here, execution only moves on once all jobs finish
cy.waitForJobsToComplete();
cy.reload();
cy.findByText("backend");
cy.findByText("frontend");
});
Production environment
In production a custom build script generates our server with the workers bundled up.
// scripts/build.js
// [...]
require("esbuild")
.build({
entryPoints: ["./server/index.ts"],
bundle: true,
outdir: "./server-build",
target: [`node17`],
platform: "node",
format: "cjs",
logLevel: "info",
external,
})
.catch((error) => {
console.error(error);
process.exit(1);
});
We deploy the app to Fly using Docker alongside our Redis deployment. We connect using Fly's internal networking which uses IPv6, our BullMQ configuration reflects this.
// app/bullmq.server.ts
export const bullMqOptions = {
connection: {
host: process.env.REDIS_HOST ?? "localhost",
password: process.env.REDIS_PASSWORD ?? "",
port: 6379,
family: isProduction ? 6 : 4,
},
};
Conclusion
That's our setup at Docbot. Our stack also includes Tailwind, MSW, Cypress, Prisma, ESLint & Husky. We find it insanely productive and if you'd like to see the code reach out to me at jamie@docbot.dev.
✌