2025. 6. 14. 16:54ㆍNodejs
이전 글에서 BullMQ가 어떤 구조로 동작하는지, 왜 사용하는지에 대해 정리해봤습니다. 이번 글에서는 NestJS에서 BullMQ를 사용하는 방법에 대해 살펴보겠습니다.
1. @nestjs/bullmq
NestJS는 BullMQ를 공식적으로 지원하며, `@nestjs/bullmq` 패키지를 사용하면 간단하게 연동할 수 있습니다.
설치
먼저 BullMQ 관련 패키지를 설치합니다.
$ pnpm install @nestjs/bullmq bullmq
BullMQ는 Redis 기반으로 동작하기 때문에, 실행 환경에 Redis가 먼저 실행되어 있어야 합니다. 저는 Docker 통해 실행하였습니다.
$ docker pull redis:latest
$ docker run -d -p 6379:6379 --name redis redis:latest
2. BullModule
이제 BullMQ를 사용하기 위해서 BullModule을 추가합니다.
@Module({
imports: [
BullModule.forRoot({
connection: {
host: 'localhost',
port: 6379
}
})
]
})
export class AppModule {}
BullModule의 주요 옵션은 아래와 같습니다.
{
// Redis 연결과 관련된 옵션
connection: {
host: 'localhost',
port: 6379
},
// 모든 작업(Job)에 기본적으로 적용할 옵션
// 큐에 작업을 추가할 때 옵션을 지정하면 기본값을 오버라이드 합니다.
defaultJobOptions: {
// 작업에 우선 순위
// 1 ~ MAX_INT까지 지정할 수 있으며, 낮을수록 우선순위가 높습니다.
// *우선순위는 성능에 약간의 영향을 미칠 수 있기 때문에 꼭 필요한 경우에만 사용해야 합니다.
priority: 1,
// 작업의 지연 시간
// 밀리초 단위로 지정합니다.
delay: 1000,
// 작업이 실패할 경우 재시도할 횟수
attempts: 3,
// 재시도 시 몇 초 간격으로 실행할지
// 밀리초 단위로 지정하거나
backoff: 1000,
// 아래와 같이 세밀하게 지정할 수 있습니다.
backoff: {
// exponential : 재시도 횟수에 따라 증가 ex) 1초 -> 2초 -> 4초 ...
// fixed : 고정된 간격 ex) 1초 간격
type: 'exponential',
delay: 1000
},
// 완료 시 삭제
removeOnComplete: true,
// 실패 시 삭제
removeOnFail: true,
// 마지막에 추가된 작업이 먼저 실행되도록 변경합니다.
lifo: true,
// 작업의 제한 시간을 설정합니다.
// 제한 시간을 넘어가면 강제로 실패 처리 됩니다.
timeout: 10000,
}
}
3. Queue
작업을 관리할 Queue를 등록합니다.
@Module({
imports: [
BullModule.forRoot({
connection: {
host: 'localhost',
port: 6379
}
})
BullModule.registerQueue({
name: 'my-queue'
})
]
})
export class AppModule {}
registerQueue의 주요 옵션은 아래와 같습니다. 전체 옵션은 BullMQ 공식 문서에서 확인할 수 있습니다.
{
// BullModule의 옵션과 동일합니다.
connection: {
...
},
// BullModule의 옵션과 동일합니다.
// 이 설정이 우선 적용됩니다.
defaultJobOptions: {
...
},
// Queue를 식별하기 위한 이름입니다.
// Producer와 Consumer가 Queue에 접근하려면 이 값을 사용해야 합니다.
name: 'my-queue',
// Redis에 저장되는 키 앞에 붙는 prefix를 설정합니다.
// 기본값은 'bull'입니다.
prefix: 'my-service',
// Worker를 직접 연결합니다.
// 외부 파일을 Worker로 등록합니다.
processors: ['./my.worker.js'],
processors: [
{
concurrency: 10,
path: './my.worker.js',
useWorkerThreads: true
},
],
// Callback 함수를 Worker를 등록합니다.
processors: [
(job: Job) => {
...
},
],
processors: [
{
concurrency: 10,
callback: async (job: Job) => {
...
},
},
],
// 작업의 상태 변화 이벤트를 저장하는 Redis 키(streams.events)에 대한 설정입니다.
streams: {
events: {
// 이벤트를 최대 몇 개까지 저장할지 설정합니다
maxLen: 100
}
}
}
4. Producer
여기서 Producer는 보통 서비스 클래스 입니다. @InjectQueue 를 통해서 Queue을 주입할 수 있습니다.
import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';
@Injectable()
export class AppService {
constructor(@InjectQueue('my-queue') private readonly myQueue: Queue) {}
}
이제 이 Queue에 아래와 같이 작업을 추가할 수 있습니다.
this.myQueue.add('doSome');
// 데이터 전달
this.myQueue.add('doSome', { foo: 'bar', baz: true });
// 옵션 설정
this.myQueue.add('doSome', { foo: 'bar', baz: true }, { delay: 1000 });
작업을 추가할 때 주요 옵션은 아래와 같습니다. 전체 옵션은 BullMQ 공식 문서에서 확인할 수 있습니다.
// defaultJobsOptions에 있는 옵션을 그대로 사용합니다.
{
// 작업 ID를 지정할 수 있습니다.
// 기본값은 숫자 값으로, 작업을 추가할 때마다 순차적으로 증가합니다.
jobId: 'my-job-1',
// 중복 작업 추가를 방지하기 위한 고유 ID를 설정합니다.
// 같은 ID를 가진 작업이 ttl(ms) 내에 추가되면 무시됩니다.
// ttl을 설정하면 일정 시간 내 중복 요청을 제한하는 Throttle 기능으로도 활용할 수 있습니다.
deduplication: {
id: 'my-job',
ttl: 500
}
}
💡 jobId로도 중복을 방지할 수 있지만, 중복 방지용으로 사용하려면 removeOnFail, removeOnComplete 설정과 함께 사용해야 하고, ttl을 지정하는 등의 세밀한 제어는 불가능합니다.
5. Consumer
Consumer는 Queue에 등록된 작업을 실제로 처리하는 컴포넌트입니다. BullMQ에서는 이를 Worker라고 부르며, NestJS에서는 @Processor 데코레이터를 사용해 Worker를 정의할 수 있습니다.
@Processor('my-queue', {
concurrency: 10
})
export class MyQueueWorker extends WorkerHost {
async process(job: Job): Promise<any> {
...
}
}
@Processor 데코레이터에 Worker의 옵션을 설정할 수 있습니다. 주요 옵션은 아래와 같습니다. 전체 옵션은 BullMQ 공식 문서에서 확인할 수 있습니다.
{
// 한 번에 병렬로 처리할 작업 수를 지정합니다.
// 기본값: 1
concurrency: 10,
// 일정 시간 내 처리할 작업 수를 제한합니다.
limiter: {
duration: 1000, // 1초 동안
max: 10, // 최대 10개의 작업만 처리
},
// 작업의 잠금 유지 시간을 설정합니다.
// Redis에서 작업을 처리 중이라는 것을 나타내는 키의 TTL을 설정합니다.
// 기본값: 30초
lockDuration: 30000,
// 작업 잠금의 TTL을 연장할 주기를 설정합니다.
// 긴 작업이 중간에 실패하지 않도록 주기적으로 TTL을 갱신합니다.
// 기본값: 10초
lockRenewTime: 10000,
// stalled 상태의 작업이 복구될 수 있는 최대 횟수를 설정합니다.
// 기본값: 1 (한 번만 복구 시도)
maxStalledCount: 1,
// stalled 작업을 감지할 주기를 설정합니다.
// 기본값: 30초
stalledInterval: 30000,
}
이렇게 정의한 Worker는 반드시 모듈에 등록해야 합니다.
@Module({
imports: [
BullModule.forRoot({
connection: {
host: 'localhost',
port: 6379
}
})
],
providers: [
MyQueueWorker
]
})
export class AppModule {
}
process 함수에 전달되는 Job에 대해서는 BullMQ의 공식 문서에서 확인할 수 있습니다. 주요 필드는 아래와 같습니다.
// Queue에 추가할 때 지정한 이름입니다.
// this.queue.add('my-job');
job.name
// Queue에 추가할 때 전달한 데이터입니다.
// this.queue.add('my-job', { foo: 'bar' });
job.data
// 작업이 실패 시 에러 메시지를 담고 있습니다.
// (throw한 에러의 메시지)
job.failedReason
// 작업이 실패한 후 재시도한 횟수
job.attemptsMade
// 작업이 등록된 시점으로, 타임스탬프 형식으로 나타냅니다. delay가 반영되지 않은 시점이며, 재시도해도 변경되지 않습니다.
job.timestamp
// 작업이 시작된(active 상태로 전환된) 시점으로, 타임스탬프 형식으로 나타냅니다. stalled, failed 작업은 재시도할 때마다 변경된다.
job.processedOn
// 작업이 종료된(completed 또는 failed 상태로 전환된) 시점으로, 타임스탬프 형식으로 나타냅니다.
job.finishedOn
6. EventListener
Queue와 Worker에서 발생하는 이벤트를 수신하려면 @OnQueueEvent 데코레이터와 @OnWorkerEvent 데코레이터를 사용하면 됩니다.
Queue 이벤트
Queue에서 발생하는 이벤트를 수신하려면, @QueueEventListener 데코레이터로 대상 Queue를 지정하고, 추상 클래스 QueueEventHost를 상속합니다. 어떤 이벤트를 수신할지는 @OnQueueEvent 데코레이터를 통해 설정합니다.
import {
InjectQueue,
OnQueueEvent,
QueueEventsHost,
QueueEventsListener,
} from '@nestjs/bullmq';
import { Queue } from 'bullmq';
@QueueEventsListener('my-queue')
export class MyQueueEventsListener extends QueueEventsHost {
constructor(
@InjectQueue('my-queue') private readonly myQueue: Queue<unknown>,
) {
super();
}
@OnQueueEvent('waiting')
onWaiting(args: { jobId: string; prev?: string }) {
}
@OnQueueEvent('delayed')
onDeleted(args: { delay: number; jobId: string }) {
}
@OnQueueEvent('active')
onActive(args: { jobId: string; prev?: string }) {
}
@OnQueueEvent('completed')
onComplete(args: { jobId: string; prev?: string; returnvalue: string }) {
}
@OnQueueEvent('failed')
async onFailed(args: { failedReason: string; jobId: string; prev?: string }) {
}
}
이벤트마다 전달되는 파라미터가 다르기 때문에, BullMQ 공식 문서를 확인하거나 OnQueueEvent의 타입을 따라가면 확인할 수 있습니다.
Worker 이벤트
Worker에서 발생하는 이벤트를 수신하려면, @Processor 데코레이터로 정의된 Worker 클래스 안에서 @OnWorkerEvent 데코레이터를 사용합니다.
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
@Processor('my-queue')
export class MyQueueWorker extends WorkerHost {
async process(job: Job): Promise<any> {
// ...
}
@OnWorkerEvent('active')
onActive(job: Job, prev: string): void {
}
@OnWorkerEvent('completed')
onCompleted(job: Job, result: any, prev: string): void {
}
@OnWorkerEvent('failed')
onFailed(job: Job, error: Error, prev; string): void {
}
}
이벤트마다 전달되는 파라미터가 다르기 때문에, BullMQ 공식 문서를 확인하거나 OnWorkerEvent의 타입을 따라가면 확인할 수 있습니다.
'Nodejs' 카테고리의 다른 글
비동기 작업 처리를 위한 메시지 큐 연동 - BullMQ 이해하기 (0) | 2025.06.03 |
---|---|
NestJS + Swagger 사용 중 겪은 DTO 네이밍 충돌 이슈 (0) | 2025.05.29 |
NestJS - EC2에 배포하고 HTTPS 적용하기 (0) | 2025.03.09 |
NestJS - @Res() 데코레이터와 Interceptor를 함께 사용하면서 겪었던 이슈 (0) | 2025.03.05 |
슬랙으로 RSS 알림 보내기 (6) - 배포 (0) | 2025.03.03 |