비동기 작업 처리를 위한 메시지 큐 연동 - Nestjs에서 BullMQ 사용하기

2025. 6. 14. 16:54Nodejs

반응형

BullMQ

 

이전 글에서 BullMQ가 어떤 구조로 동작하는지, 왜 사용하는지에 대해 정리해봤습니다. 이번 글에서는 NestJS에서 BullMQ를 사용하는 방법에 대해 살펴보겠습니다.

 

1. @nestjs/bullmq

NestJS는 BullMQ를 공식적으로 지원하며, `@nestjs/bullmq` 패키지를 사용하면 간단하게 연동할 수 있습니다.

설치

먼저 BullMQ 관련 패키지를 설치합니다.

$ pnpm install @nestjs/bullmq bullmq

 

BullMQ는 Redis 기반으로 동작하기 때문에, 실행 환경에 Redis가 먼저 실행되어 있어야 합니다. 저는 Docker 통해 실행하였습니다.

$ docker pull redis:latest
$ docker run -d -p 6379:6379 --name redis redis:latest

 

2. BullModule

이제 BullMQ를 사용하기 위해서 BullModule을 추가합니다.

@Module({
  imports: [
    BullModule.forRoot({
      connection: {
        host: 'localhost',
        port: 6379
      }
    })
  ]
})
export class AppModule {}

 

BullModule의 주요 옵션은 아래와 같습니다.

{
  // Redis 연결과 관련된 옵션
  connection: {
    host: 'localhost',
    port: 6379
  },
  
  // 모든 작업(Job)에 기본적으로 적용할 옵션
  // 큐에 작업을 추가할 때 옵션을 지정하면 기본값을 오버라이드 합니다.
  defaultJobOptions: {
    // 작업에 우선 순위
    // 1 ~ MAX_INT까지 지정할 수 있으며, 낮을수록 우선순위가 높습니다.
    // *우선순위는 성능에 약간의 영향을 미칠 수 있기 때문에 꼭 필요한 경우에만 사용해야 합니다.
    priority: 1,
	
    // 작업의 지연 시간
    // 밀리초 단위로 지정합니다.
    delay: 1000,
	
    // 작업이 실패할 경우 재시도할 횟수
    attempts: 3,
	
    // 재시도 시 몇 초 간격으로 실행할지
    // 밀리초 단위로 지정하거나
    backoff: 1000,
    // 아래와 같이 세밀하게 지정할 수 있습니다.
    backoff: {
	    // exponential : 재시도 횟수에 따라 증가 ex) 1초 -> 2초 -> 4초 ...
	    // fixed : 고정된 간격 ex) 1초 간격
	    type: 'exponential',
	    delay: 1000
    },
	
    // 완료 시 삭제
    removeOnComplete: true,
	
    // 실패 시 삭제
    removeOnFail: true,
	
    // 마지막에 추가된 작업이 먼저 실행되도록 변경합니다.
    lifo: true,
	
    // 작업의 제한 시간을 설정합니다.
    // 제한 시간을 넘어가면 강제로 실패 처리 됩니다.
    timeout: 10000,
  }
}

 

3. Queue

작업을 관리할 Queue를 등록합니다.

@Module({
  imports: [
    BullModule.forRoot({
      connection: {
        host: 'localhost',
        port: 6379
      }
    })
    BullModule.registerQueue({
      name: 'my-queue'
    })
  ]
})
export class AppModule {}

 

registerQueue의 주요 옵션은 아래와 같습니다. 전체 옵션은 BullMQ 공식 문서에서 확인할 수 있습니다.

{
  // BullModule의 옵션과 동일합니다.
  connection: {
    ...
  },

  // BullModule의 옵션과 동일합니다. 
  // 이 설정이 우선 적용됩니다.
  defaultJobOptions: {
    ...
  },

  // Queue를 식별하기 위한 이름입니다.
  // Producer와 Consumer가 Queue에 접근하려면 이 값을 사용해야 합니다.
  name: 'my-queue',

  // Redis에 저장되는 키 앞에 붙는 prefix를 설정합니다.
  // 기본값은 'bull'입니다.
  prefix: 'my-service',

  // Worker를 직접 연결합니다.
  // 외부 파일을 Worker로 등록합니다.
  processors: ['./my.worker.js'],
  processors: [
    {
      concurrency: 10,
      path: './my.worker.js',
      useWorkerThreads: true
    },
  ],
  // Callback 함수를 Worker를 등록합니다.
  processors: [
    (job: Job) => {
      ...
    },
  ],
  processors: [
    {
      concurrency: 10,
      callback: async (job: Job) => {
        ...
      },
    },
  ],

  // 작업의 상태 변화 이벤트를 저장하는 Redis 키(streams.events)에 대한 설정입니다.
  streams: {
    events: {
      // 이벤트를 최대 몇 개까지 저장할지 설정합니다
      maxLen: 100
    }
  }
}

 

4. Producer

여기서 Producer는 보통 서비스 클래스 입니다. @InjectQueue 를 통해서 Queue을 주입할 수 있습니다.

import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';

@Injectable()
export class AppService {
  constructor(@InjectQueue('my-queue') private readonly myQueue: Queue) {}
}

 

이제 이 Queue에 아래와 같이 작업을 추가할 수 있습니다.

this.myQueue.add('doSome');

// 데이터 전달
this.myQueue.add('doSome', { foo: 'bar', baz: true });

// 옵션 설정
this.myQueue.add('doSome', { foo: 'bar', baz: true }, { delay: 1000 });

 

작업을 추가할 때 주요 옵션은 아래와 같습니다. 전체 옵션은 BullMQ 공식 문서에서 확인할 수 있습니다.

// defaultJobsOptions에 있는 옵션을 그대로 사용합니다.
{	
  // 작업 ID를 지정할 수 있습니다.
  // 기본값은 숫자 값으로, 작업을 추가할 때마다 순차적으로 증가합니다.
  jobId: 'my-job-1',
	
  // 중복 작업 추가를 방지하기 위한 고유 ID를 설정합니다.
  // 같은 ID를 가진 작업이 ttl(ms) 내에 추가되면 무시됩니다.
  // ttl을 설정하면 일정 시간 내 중복 요청을 제한하는 Throttle 기능으로도 활용할 수 있습니다.
  deduplication: {
    id: 'my-job',
    ttl: 500
  }
}
💡 jobId로도 중복을 방지할 수 있지만, 중복 방지용으로 사용하려면 removeOnFail, removeOnComplete 설정과 함께 사용해야 하고, ttl을 지정하는 등의 세밀한 제어는 불가능합니다.

 

5. Consumer

Consumer는 Queue에 등록된 작업을 실제로 처리하는 컴포넌트입니다. BullMQ에서는 이를 Worker라고 부르며, NestJS에서는 @Processor 데코레이터를 사용해 Worker를 정의할 수 있습니다.

@Processor('my-queue', {
  concurrency: 10
})
export class MyQueueWorker extends WorkerHost {
  async process(job: Job): Promise<any> {
    ...
  }
}

 

@Processor 데코레이터에 Worker의 옵션을 설정할 수 있습니다. 주요 옵션은 아래와 같습니다. 전체 옵션은 BullMQ 공식 문서에서 확인할 수 있습니다.

{
  // 한 번에 병렬로 처리할 작업 수를 지정합니다.
  // 기본값: 1
  concurrency: 10,

  // 일정 시간 내 처리할 작업 수를 제한합니다.
  limiter: {
    duration: 1000, // 1초 동안
    max: 10,        // 최대 10개의 작업만 처리
  },

  // 작업의 잠금 유지 시간을 설정합니다.
  // Redis에서 작업을 처리 중이라는 것을 나타내는 키의 TTL을 설정합니다.
  // 기본값: 30초
  lockDuration: 30000,

  // 작업 잠금의 TTL을 연장할 주기를 설정합니다.
  // 긴 작업이 중간에 실패하지 않도록 주기적으로 TTL을 갱신합니다.
  // 기본값: 10초
  lockRenewTime: 10000,

  // stalled 상태의 작업이 복구될 수 있는 최대 횟수를 설정합니다.
  // 기본값: 1 (한 번만 복구 시도)
  maxStalledCount: 1,

  // stalled 작업을 감지할 주기를 설정합니다.
  // 기본값: 30초
  stalledInterval: 30000,
}

 

이렇게 정의한 Worker는 반드시 모듈에 등록해야 합니다.

@Module({
  imports: [
    BullModule.forRoot({
      connection: {
        host: 'localhost',
        port: 6379
      }
    })
  ],
  providers: [
    MyQueueWorker
  ]
})
export class AppModule {

}

 

process 함수에 전달되는 Job에 대해서는 BullMQ의 공식 문서에서 확인할 수 있습니다. 주요 필드는 아래와 같습니다.

// Queue에 추가할 때 지정한 이름입니다.
// this.queue.add('my-job');
job.name

// Queue에 추가할 때 전달한 데이터입니다.
// this.queue.add('my-job', { foo: 'bar' });
job.data

// 작업이 실패 시 에러 메시지를 담고 있습니다.
// (throw한 에러의 메시지)
job.failedReason

//  작업이 실패한 후 재시도한 횟수
job.attemptsMade

// 작업이 등록된 시점으로, 타임스탬프 형식으로 나타냅니다. delay가 반영되지 않은 시점이며, 재시도해도 변경되지 않습니다.
job.timestamp

// 작업이 시작된(active 상태로 전환된) 시점으로, 타임스탬프 형식으로 나타냅니다. stalled, failed 작업은 재시도할 때마다 변경된다.
job.processedOn

// 작업이 종료된(completed 또는 failed 상태로 전환된) 시점으로, 타임스탬프 형식으로 나타냅니다.
job.finishedOn

 

6. EventListener

Queue와 Worker에서 발생하는 이벤트를 수신하려면 @OnQueueEvent 데코레이터와 @OnWorkerEvent 데코레이터를 사용하면 됩니다.

Queue 이벤트

Queue에서 발생하는 이벤트를 수신하려면, @QueueEventListener 데코레이터로 대상 Queue를 지정하고, 추상 클래스 QueueEventHost를 상속합니다. 어떤 이벤트를 수신할지는 @OnQueueEvent 데코레이터를 통해 설정합니다.

import {
  InjectQueue,
  OnQueueEvent,
  QueueEventsHost,
  QueueEventsListener,
} from '@nestjs/bullmq';
import { Queue } from 'bullmq';

@QueueEventsListener('my-queue')
export class MyQueueEventsListener extends QueueEventsHost {
  constructor(
    @InjectQueue('my-queue') private readonly myQueue: Queue<unknown>,
  ) {
    super();
  }

  @OnQueueEvent('waiting')
  onWaiting(args: { jobId: string; prev?: string }) {
    
  }

  @OnQueueEvent('delayed')
  onDeleted(args: { delay: number; jobId: string }) {
    
  }

  @OnQueueEvent('active')
  onActive(args: { jobId: string; prev?: string }) {
    
  }

  @OnQueueEvent('completed')
  onComplete(args: { jobId: string; prev?: string; returnvalue: string }) {
    
  }

  @OnQueueEvent('failed')
  async onFailed(args: { failedReason: string; jobId: string; prev?: string }) {
    
  }
}

 

이벤트마다 전달되는 파라미터가 다르기 때문에, BullMQ 공식 문서를 확인하거나 OnQueueEvent의 타입을 따라가면 확인할 수 있습니다.

Worker 이벤트

Worker에서 발생하는 이벤트를 수신하려면, @Processor 데코레이터로 정의된 Worker 클래스 안에서 @OnWorkerEvent 데코레이터를 사용합니다.

import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';

@Processor('my-queue')
export class MyQueueWorker extends WorkerHost {
  async process(job: Job): Promise<any> {
    // ...
  }

  @OnWorkerEvent('active')
  onActive(job: Job, prev: string): void {
  
  }

  @OnWorkerEvent('completed')
  onCompleted(job: Job, result: any, prev: string): void {
  
  }

  @OnWorkerEvent('failed')
  onFailed(job: Job, error: Error, prev; string): void {
  
  }
}

 

이벤트마다 전달되는 파라미터가 다르기 때문에, BullMQ 공식 문서를 확인하거나 OnWorkerEvent의 타입을 따라가면 확인할 수 있습니다.

반응형