Skip to main content

Background Jobs

Overview

BookWish uses Bull for background job processing. Bull is a Redis-based queue system for Node.js that handles asynchronous tasks, scheduled jobs, and job retries.

Technology Stack

  • Queue: Bull 4.12
  • Cache/Store: Redis (IORedis 5.3)
  • Scheduler: Cron expressions

Queue Configuration

Queue Factory

// src/jobs/queue.ts
import Queue from 'bull';
import { env } from '../config/env';

const redisConfig = {
redis: {
host: new URL(env.REDIS_URL).hostname,
port: parseInt(new URL(env.REDIS_URL).port || '6379'),
password: new URL(env.REDIS_URL).password || undefined,
},
};

export function createQueue<T = any>(name: string): Queue.Queue<T> {
return new Queue<T>(name, redisConfig);
}

Queue Initialization

Queues are initialized on server startup:

// src/index.ts
import { scheduleSquareSyncs } from './jobs/square-sync.job';
import { scheduleReservationCleanup } from './jobs/reservation-cleanup.job';
import { initializeDigestScheduler } from './jobs/order-digest.job';

async function main() {
// ... database and Redis connection ...

// Schedule recurring jobs
await scheduleSquareSyncs();
await scheduleReservationCleanup();
initializeDigestScheduler();

// Start server
app.listen(env.PORT);
}

Job Types

1. Notification Job

File: src/jobs/notification.job.ts

Sends push and email notifications asynchronously.

Queue: notifications

Job Data:

interface NotificationJobData {
notificationId: string;
userId: string;
type: NotificationType;
title: string;
body: string;
orderId?: string;
orderNumber?: string;
}

Processing:

notificationQueue.process(async (job: Job<NotificationJobData>) => {
const { notificationId, userId, type, title, body } = job.data;

// 1. Get user's push tokens
const pushTokens = await prisma.pushToken.findMany({
where: { userId },
});

// 2. Send push notification to all devices
for (const tokenRecord of pushTokens) {
await sendPush(tokenRecord.token, {
title,
body,
data: { notificationId, type },
});
}

// 3. Send email for critical notifications
const emailTypes = [
'order_update',
'payment_issue',
'premium_subscription_expiring'
];

if (emailTypes.includes(type)) {
const user = await prisma.user.findUnique({
where: { id: userId },
select: { email: true },
});

if (user?.email) {
await sendEmail({
to: user.email,
subject: title,
htmlBody: `<p>${body}</p>`,
textBody: body,
});
}
}
});

Queueing:

import { queueNotification } from '../jobs/notification.job';

await queueNotification({
notificationId: notification.id,
userId: user.id,
type: 'order_update',
title: 'Order Shipped',
body: 'Your order #12345 has been shipped',
orderId: order.id,
orderNumber: order.orderNumber,
});

Retry Configuration:

  • Attempts: 3
  • Backoff: Exponential starting at 2 seconds
  • Remove on complete: Yes
  • Remove on fail: No

2. Square Sync Job

File: src/jobs/square-sync.job.ts

Hourly sync of inventory from Square POS as backup to webhooks.

Queue: square-sync

Job Data:

interface SquareSyncJob {
storeId: string;
}

Processing:

squareSyncQueue.process(async (job) => {
const { storeId } = job.data;

// Sync inventory from Square
const result = await inventoryService.syncFromSquare(storeId);

logger.info('square.sync_completed', { storeId, result });

return result;
});

Scheduling:

export async function scheduleSquareSyncs(): Promise<void> {
// Find all stores with Square integration
const stores = await prisma.store.findMany({
where: {
inventorySource: 'square',
squareAccessToken: { not: null },
},
});

// Add recurring job for each store
for (const store of stores) {
await squareSyncQueue.add(
{ storeId: store.id },
{
jobId: `square-sync-${store.id}`, // Prevents duplicates
repeat: {
cron: '0 * * * *', // Every hour at minute 0
},
removeOnComplete: true,
removeOnFail: false,
attempts: 3,
backoff: {
type: 'exponential',
delay: 60000, // 1 minute
},
}
);
}
}

Manual Trigger:

export async function triggerStoreSync(storeId: string): Promise<void> {
await squareSyncQueue.add(
{ storeId },
{
removeOnComplete: true,
attempts: 1,
}
);
}

Schedule: Every hour at :00 (e.g., 1:00, 2:00, 3:00)

3. Reservation Cleanup Job

File: src/jobs/reservation-cleanup.job.ts

Releases stuck inventory reservations from abandoned/failed orders.

Queue: reservation-cleanup

Job Data: None (no payload)

Processing:

reservationCleanupQueue.process(async () => {
// Find pending orders older than 30 minutes
const cutoffTime = new Date();
cutoffTime.setMinutes(cutoffTime.getMinutes() - 30);

const stuckOrders = await prisma.order.findMany({
where: {
status: 'pending',
createdAt: { lt: cutoffTime },
},
include: {
items: {
where: {
inventoryId: { not: null },
},
},
},
});

// Release inventory for each stuck order
for (const order of stuckOrders) {
for (const item of order.items) {
if (item.inventoryId) {
await prisma.inventory.update({
where: { id: item.inventoryId },
data: {
reservedQuantity: { decrement: item.quantity },
},
});
}
}

// Mark order as cancelled
await prisma.order.update({
where: { id: order.id },
data: {
status: 'cancelled',
paymentStatus: 'failed',
},
});
}

return {
releasedOrders: stuckOrders.length,
releasedItems: stuckOrders.reduce((sum, o) => sum + o.items.length, 0),
};
});

Scheduling:

export async function scheduleReservationCleanup(): Promise<void> {
await reservationCleanupQueue.add(
{},
{
jobId: 'reservation-cleanup-recurring',
repeat: {
cron: '*/15 * * * *', // Every 15 minutes
},
removeOnComplete: true,
removeOnFail: false,
attempts: 3,
backoff: {
type: 'exponential',
delay: 30000, // 30 seconds
},
}
);
}

Schedule: Every 15 minutes

4. Order Digest Job

File: src/jobs/order-digest.job.ts

Daily email digest of new orders for stores and BookWish operations.

Queue: order-digest

Job Data:

{ type: 'daily' }

Processing:

orderDigestQueue.process(async (job) => {
// Process store digests
await processStoreDigests();

// Process BookWish ops digest
await processBookwishOpsDigest();
});

async function processStoreDigests(): Promise<void> {
// Get stores with pending digest items
const storesWithPendingItems = await prisma.orderDigestQueue.groupBy({
by: ['storeId'],
where: { processed: false },
});

// Send digest email for each store
for (const storeGroup of storesWithPendingItems) {
const digestItems = await prisma.orderDigestQueue.findMany({
where: { storeId: storeGroup.storeId, processed: false },
include: { order: { include: { items: true, user: true } } },
});

const store = await prisma.store.findUnique({
where: { id: storeGroup.storeId },
include: { owner: true },
});

const ownerEmail = store.owner?.email || store.email;
if (ownerEmail) {
await emailService.sendStoreOrderDigest(
store,
ownerEmail,
digestItems.map(item => ({
order: item.order,
isSpecialOrder: item.isSpecialOrder,
}))
);
}

// Mark as processed
await prisma.orderDigestQueue.updateMany({
where: { id: { in: digestItems.map(i => i.id) } },
data: { processed: true },
});
}
}

Queueing Orders:

// Queue order for store digest
export async function queueStoreOrderForDigest(
storeId: string,
orderId: string,
isSpecialOrder: boolean
): Promise<void> {
await prisma.orderDigestQueue.create({
data: { storeId, orderId, isSpecialOrder },
});
}

// Queue order for BookWish ops digest
export async function queueOrderForOpsDigest(orderId: string): Promise<void> {
await prisma.bookwishOpsDigestQueue.create({
data: { orderId },
});
}

Scheduling:

export function initializeDigestScheduler(): void {
orderDigestQueue.add(
{ type: 'daily' },
{
repeat: { cron: '0 6 * * *' }, // Every day at 6 AM
removeOnComplete: true,
removeOnFail: false,
}
);
}

Schedule: Daily at 6:00 AM

Cron Expression Reference

┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday)
│ │ │ │ │
│ │ │ │ │
* * * * *

Examples:

  • 0 * * * * - Every hour at minute 0
  • */15 * * * * - Every 15 minutes
  • 0 6 * * * - Daily at 6:00 AM
  • 0 0 * * 0 - Weekly on Sunday at midnight
  • 0 0 1 * * - Monthly on the 1st at midnight

Job Configuration Options

Common Options

await queue.add(
jobData,
{
// Job ID (prevents duplicates)
jobId: 'unique-job-id',

// Retry configuration
attempts: 3,
backoff: {
type: 'exponential', // or 'fixed'
delay: 2000, // milliseconds
},

// Cleanup
removeOnComplete: true,
removeOnFail: false,

// Delayed execution
delay: 5000, // 5 seconds

// Recurring jobs
repeat: {
cron: '0 * * * *',
},

// Priority (higher = processed first)
priority: 1,

// Timeout
timeout: 60000, // 60 seconds
}
);

Retry Strategies

Exponential Backoff:

backoff: {
type: 'exponential',
delay: 2000, // Start at 2 seconds
}
// Retry delays: 2s, 4s, 8s, 16s, ...

Fixed Backoff:

backoff: {
type: 'fixed',
delay: 5000, // Always 5 seconds
}
// Retry delays: 5s, 5s, 5s, ...

Job Events

Listening to Events

queue.on('completed', (job, result) => {
logger.info('job.completed', { jobId: job.id, result });
});

queue.on('failed', (job, error) => {
logger.error('job.failed', {
jobId: job.id,
error: error.message,
});
});

queue.on('stalled', (job) => {
logger.warn('job.stalled', { jobId: job.id });
});

queue.on('progress', (job, progress) => {
logger.info('job.progress', { jobId: job.id, progress });
});

Available Events

  • completed - Job finished successfully
  • failed - Job failed after all retries
  • stalled - Job is stuck
  • progress - Job reported progress
  • active - Job started processing
  • waiting - Job added to queue
  • delayed - Job scheduled for later
  • removed - Job removed from queue
  • cleaned - Old jobs cleaned up

Job Progress Reporting

queue.process(async (job) => {
// Report progress (0-100)
await job.progress(25);

// Do some work
await doStep1();
await job.progress(50);

await doStep2();
await job.progress(75);

await doStep3();
await job.progress(100);

return { success: true };
});

Queue Management

Pause/Resume Queue

// Pause queue (stop processing)
await queue.pause();

// Resume queue
await queue.resume();

Clean Old Jobs

// Remove completed jobs older than 1 hour
await queue.clean(3600000, 'completed');

// Remove failed jobs older than 1 day
await queue.clean(86400000, 'failed');

Get Queue Status

const jobCounts = await queue.getJobCounts();
// {
// waiting: 5,
// active: 2,
// completed: 100,
// failed: 3,
// delayed: 0,
// paused: 0
// }

Error Handling

Automatic Retries

Jobs automatically retry on failure based on configuration:

await queue.add(
jobData,
{
attempts: 3, // Total attempts (original + 2 retries)
backoff: {
type: 'exponential',
delay: 2000,
},
}
);

Failed Job Handler

queue.on('failed', async (job, error) => {
logger.error('job.failed', {
jobId: job.id,
attempts: job.attemptsMade,
error: error.message,
stack: error.stack,
});

// Optionally alert on critical failures
if (job.data.type === 'critical') {
await alertOps(`Job ${job.id} failed after ${job.attemptsMade} attempts`);
}
});

Best Practices

  1. Idempotent Jobs - Jobs should be safe to run multiple times
  2. Fail Fast - Return early on invalid data
  3. Log Progress - Log important steps for debugging
  4. Handle Errors Gracefully - Catch and log errors, don't crash
  5. Set Timeouts - Prevent jobs from running forever
  6. Clean Up - Remove old completed jobs
  7. Monitor Queues - Track queue depth and failed jobs
  8. Use Job IDs - Prevent duplicate jobs with unique IDs
  9. Graceful Shutdown - Wait for active jobs to complete
  10. Test Jobs - Write tests for job processing logic

Monitoring

Queue Metrics

// Get queue stats
const waiting = await queue.getWaitingCount();
const active = await queue.getActiveCount();
const failed = await queue.getFailedCount();

logger.info('queue.metrics', {
queue: 'notifications',
waiting,
active,
failed,
});

Job Inspection

// Get specific job
const job = await queue.getJob(jobId);

// Get job state
const state = await job.getState();
// 'waiting' | 'active' | 'completed' | 'failed' | 'delayed'

// Get job logs
const logs = await queue.getJobLogs(jobId);

Graceful Shutdown

async function gracefulShutdown() {
logger.info('jobs.shutdown_initiated');

// Close all queues
await notificationQueue.close();
await squareSyncQueue.close();
await reservationCleanupQueue.close();
await orderDigestQueue.close();

logger.info('jobs.shutdown_completed');
}

process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);