2025. 6. 3. 21:41ㆍNodejs
최근에 진행 중인 프로젝트에 LLM 기반의 챗봇 기능을 추가하게 되었습니다. 사용자 맞춤형 상담을 제공하기 위해, 미리 정의된 프롬프트와 함께 고급 추론이 가능한 모델을 호출하는 방식으로 구현하게 되었습니다. 그런데 실제로 서비스를 붙여보니, 모델의 응답 시간이 짧게는 1분, 길게는 3분 이상 걸리는 경우도 있었습니다. 단일 요청으로 처리하기에는 너무 긴 시간이었고, 이로 인해 시스템 응답성과 리소스 관리에 고민이 생기기 시작했습니다.
처음에는 웹 소켓을 이용한 실시간 통신 방식도 고려했지만 모델 응답 시간이 길다 보니, 클라이언트를 계속 대기시키는 구조는 적절하지 않았고, 서버에서 작업이 완료되었을 때 클라이언트에 알림을 보내는 방식이 필요하다고 생각했기 때문입니다. 하지만 LLM 연동 자체도 처음이었고, 여기에 웹 소켓 운영 경험도 없다 보니 다른 방법을 찾아야겠다고 판단했습니다.
그래서 기존처럼 REST API를 사용하되, 서버는 오래 걸리는 작업을 백그라운드에서 따로 처리하고, 결과는 나중에 클라이언트에 전달하는 구조를 떠올리게 되었습니다. 이런 비동기 처리를 구현하기 위해 메시지 큐를 선택하게 되었습니다.
1. 메시지 큐
메시지 큐는 서로 독립적인 컴포넌트(프로세스, 서비스)가 비동기 방식으로 메시지를 주고받으며 통신할 수 있도록 도와주는 시스템입니다. 보통 생성자(Producer)가 보낸 메시지를 큐(Queue)에 저장하고, 소비자(Consumer)가 이 메시지를 꺼내서 처리하는 구조입니다.
(1) 메시지 큐의 구조
Producer : Producer는 작업을 요청하는 주체입니다. 직접 처리하지 않고, Queue에 메시지를 등록합니다.
Queue : Queue는 Producer가 등록한 메시지를 일시적으로 저장하는 공간입니다. 저장된 메시지는 등록된 순서대로 Consumer에게 전달됩니다.
Job : Job은 Queue에 등록된 하나의 작업 단위입니다. Producer가 Queue에 메시지를 등록하면, Job이라는 형태로 저장하고 관리합니다. Job은 메시지 본문과 실행 상태, 재시도 횟수 등의 메타데이터를 포함하고 있습니다.
❗하지만 모든 메시지 큐가 Job이라는 개념을 사용하는 것은 아니며, 어떤 시스템에서는 메시지를 직접 다루고, Job이라는 구조를 별도로 제공하지 않기도 합니다.
Consumer : Consumer는 Queue를 통해 전달받은 메시지를 직접 처리하는 주체입니다. Producer가 요청한 작업을 실제로 실행하는 역할을 합니다.
(2) 언제 사용할까?
비동기 작업 처리 : 시간이 오래 걸리는 작업을 백그라운드에서 처리하기 위해서 메시지 큐를 사용할 수 있습니다. 클라이언트 요청이 들어오면 메시지 큐에 작업을 등록하고 즉시 응답하여 병목을 줄일 수 있습니다.
트래픽 완충 및 부하 분산 : 짧은 시간에 많은 요청이 몰릴 경우, 서버가 모든 작업을 즉시 처리하지 않고 메시지 큐에 쌓아두고 순처적으로 처리함으로써, 일시적인 트래픽 증가로 인한 과부하나 요청 실패를 방지할 수 있습니다.
마이크로서비스 간 통신 : 동기적 통신은 응답을 기다려야 하기 때문에, 응답 지연이나 서비스 장애가 발생하면 다른 서비스에도 연쇄적으로 영향을 줄 수 있습니다. 반면, 메시지 큐를 이용한 비동기 메시지 기반 통신은 서비스 간 결합도를 낮춰, 이러한 장애 전파 문제를 효과적으로 줄일 수 있습니다.
(3) 대표적인 메시지 큐
- Rabbit MQ
- Kafka
- Bull MQ
2. BullMQ
현재 프로젝트는 NestJS 기반으로 구성되어 있고, NestJS에서는 @nestjs/bullmq 모듈을 통해 BullMQ를 공식적으로 지원합니다. 그리고 메시지 큐를 사용하는 목적이 다른 서비스 간의 통신이 아니라 비동기 작업 처리를 위해서였기 때문에, BullMQ를 선택하게 되었습니다.
(1) BullMQ의 구조
- Queue
- Worker (= Consumer)
- Job (= Message)
- Event
(2) BullMQ의 특징
- Redis를 메시지 브로커로 사용하기 때문에, Kafka나 RabbitMQ처럼 별도의 브로커 서버를 설치하거나 운영할 필요가 없습니다.
- Producer의 요청은 Job 단위로 관리되기 때문에, 작업의 상태 추적이나 실패 시 재시도 같은 처리를 별도 로직 없이 간단히 구현할 수 있습니다.
- 작업 상태를 실시간으로 감지할 수 있어, 작업의 시작, 완료, 실패 등 상황에 따라 다양한 처리를 쉽게 연결할 수 있습니다.
3. BullMQ - Job
BullMQ에서 작업은 Job이라는 단위로 관리됩니다. Job은 어떤 작업을 실행할지에 대한 데이터와 지연 시간, 우선 순위, 재시도 횟수, 상태 정보 등 다양한 메타데이터를 포함하고 있습니다.
주요 필드
id : 작업에 부여된 ID로, 작업을 조회하거나 작업의 정보를 수정할 때 사용할 수 있습니다. Queue에 Job을 등록할 때 ID를 명시적으로 지정할 수도 있고, 지정하지 않으면 BullMQ가 자동으로 생성합니다. ID는 Redis에서 작업을 구분하는 기준이 되기 때문에 중복되면 기존 작업이 덮어씌워지지 않고 무시됩니다. 특히 BullMQ는 작업이 성공하거나 실패하더라도 기본적으로 작업 데이터를 삭제하지 않기 때문에, 한 번 사용된 Job ID는 다시 사용할 수 없습니다. 다만, 필요하다면 작업이 완료되거나 실패했을 때 자동으로 삭제되도록 설정할 수도 있습니다.
queue.add('sendEmail', {}); // 자동으로 부여한다.
queue.add('sendEmail', {}, { jobId: 'job_1' }); // 지정한 ID를 사용한다.
name : Queue에 등록할 때 설정한 이름으로, 작업의 종류를 구분할 때 사용할 수 있습니다.
queue.add('sendEmail');
data : 작업을 처리할 때 사용할 데이터로, Queue에 등록할 때 전달할 수 있습니다. 데이터는 Redis에 평문으로 저장되기 때문에 민감한 데이터는 포함하지 않는게 좋고, 불가피하다면 암호화를 하는 것이 안전합니다. 별도로 설정하지 않으면 BullMQ는 성공/실패한 작업을 모두 보관하고 있기 때문에 데이터가 많다면 Redis 메모리 사용량이 높아집니다.
queue.add('sendEmail', { to: 'foo@bar.com', subject: 'hello' });
attemptsMade : 작업이 실패한 후 재시도한 횟수로, 재시도한 작업을 구분할 때 사용할 수 있습니다.
attemptsStarted : 작업을 시도한 횟수로, 작업이 active 상태가 되면 카운팅된다.
failedReason : 작업이 실패 시 에러 메시지를 담고 있습니다.
async process(job: Job): Promise<void> {
throw new Error('some error message');
}
console.log(job.failedReason); // some error message
stalledCounter : 작업이 중단된 횟수로, 중단(stalled)은 작업이 active 상태에서 completed나 failed 되지 않고 중간에 멈춘 것을 의미합니다.
timestamp : 작업이 등록된 시점으로, 타임스탬프 형식으로 나타냅니다. delay가 반영되지 않은 시점이며, 재시도해도 변경되지 않습니다.
processedOn : 작업이 시작된(active 상태로 전환된) 시점으로, 타임스탬프 형식으로 나타냅니다. stalled, failed 작업은 재시도할 때마다 변경된다.
finishedOn : 작업이 종료된(completed 또는 failed 상태로 전환된) 시점으로, 타임스탬프 형식으로 나타냅니다.
주요 상태
waiting : 작업이 실행되기를 기다리고 있는 상태입니다. 지연(delay) 없이 등록된 작업은 즉시 실행 대기 상태로 분류되며, Redis에서 bull:<queue_name>:wait 키(LIST)에 추가됩니다.
delayed : 특정 시간 이후에 실행되도록 예약된 작업 상태입니다. delay 옵션이 설정된 작업은 bull:<queue_name>:delayed 키(ZSET)에 추가되며, 설정된 시간이 지나면 자동으로 waiting 상태로 전환됩니다.
# 실행될 시간은 정확도를 위해 스케일링된 값을 저장한다.
$ 127.0.0.1:6379> zrange bull:my-queue:delayed 0 10 withscores
1) "40"
2) "7162630851657728" // (timestamp + delay) * 4096
active : 현재 Worker가 작업을 처리 중인 상태입니다. Job을 가져온 Worker는 해당 작업에 대한 락을 설정하며, 작업 ID는 bull:<queue_name>:active 키(LIST)에 추가됩니다.
completed : 작업이 정상적으로 완료된 상태입니다. 기본적으로는 bull:<queue_name>:completed 키(ZSET)에 저장되며, 자동으로 삭제 되지 않습니다. 필요하다면 removeOnComplete 옵션을 통해 작업 완료 시 자동으로 삭제되도록 설정할 수 있습니다.
# 완료된 시간은 timestamp 값을 저장한다.
127.0.0.1:6379> zrange bull:my-queue:completed 0 10 withscores
1) "1"
2) "1748687903072"
failed : 작업 처리 중 오류가 발생하여 실패한 상태입니다. 재시도 설정에 따라 다시 큐에 들어갈 수 있으며, 최종 실패한 작업은 bull:<queue_name>:failed 키(ZSET)에 저장됩니다. 마찬가지로 자동으로 삭제되지 않습니다. 필요하다면 removeOnFail 옵션을 통해 작업 완료 시 자동으로 섹제되도록 설정할 수 있습니다.
# 실패한 시간은 timestamp 값을 저장한다.
127.0.0.1:6379> zrange bull:my-queue:failed 0 10 withscores
1) "41"
2) "1748689466586"
stalled : 작업이 실행 중이었지만, 일정 시간 동안 진행되지 않아 중단된 것으로 간주된 상태입니다. 해당 작업은 bull:<queue_name>:stalled 키(LIST)에 임시로 추가된 후 자동으로 재처리됩니다.
4. BullMQ - 프로세스
초기화
서버가 시작되면 BullMQ는 작업을 관리하기 위해 Redis에 몇 가지 키를 생성합니다.
bull:<queue_name>:meta : 메타데이터를 저장하는 키(HASH)로, BullMQ의 버전과 지정한 옵션 정보들을 저장합니다.
127.0.0.1:6379> hgetall bull:my-queue:meta
1) "opts.maxLenEvents"
2) "10000"
3) "version"
4) "bullmq:5.53.1"
bull:<queue_name>:stalled-check : stalled 상태를 검사하기 위해 사용하는 키(STRING)로, 마지막으로 검사를 수행한 시점을 타임스탬프 형식으로 저장합니다. BullMQ는 이 값을 기준으로 일정 주기마다 stalled 검사를 수행하고, 중단된 작업이 있는지 판단합니다.
127.0.0.1:6379> get bull:my-queue:stalled-check
"1748746313806"
bull:<queue_name>:id : 작업에 부여할 ID를 관리하는 키(STRING)로, 작업이 등록될 때마다 순차적으로 증가시키며 고유 ID를 생성합니다.
127.0.0.1:6379> get bull:my-queue:id
"2"
bull:<queue_name>:events : 작업의 상태 변화를 기록하는 키(STREAM)로, 작업의 상태가 변경될때마다 작업 ID, 이전 상태, 변경된 상태를 저장합니다.
127.0.0.1:6379> xrange bull:my-queue:events - + count 10
1) 1) "1748744615340-0"
2) 1) "event"
2) "added"
3) "jobId"
4) "1"
5) "name"
6) "foo"
2) 1) "1748744615340-1"
2) 1) "event"
2) "delayed"
3) "jobId"
4) "1"
5) "delay"
6) "1748744620339"
3) 1) "1748744620361-0"
2) 1) "event"
2) "waiting"
3) "jobId"
4) "1"
5) "prev"
6) "delayed"
bull:<queue_name>:marker : 우선 순위가 지정된 작업, 지연된 작업, 처리 시간을 넘긴 작업 등 일반 작업보다 먼저 처리해야 하는 작업을 관리하는 키(ZSET)입니다.
마커는 BullMQ v5부터 추가된 개념으로 우선 순위 작업, 지연된 작업, 처리 시간을 넘긴 작업 등 일반적인 대기열과는 다른 조건을 가진 작업이 존재함을 워커에게 알려주는 역할을 합니다. 이를 통해 워커는 단순히 대기열만 감시하는 것이 아니라, 특수 조건을 만족한 작업이 있는지를 정확히 판단하고 적절한 시점에 처리할 수 있습니다.
작업 등록
Queue에 작업을 추가하면 BullMQ는 Redis에 다음과 같은 키를 등록합니다.
bull:<queue_name>:wait : 다음에 실행될 작업 목록을 관리하는 키(LIST)로, 작업 ID를 저장합니다.
127.0.0.1:6379> lrange bull:my-queue:wait 0 10
1) "25"
2) "24"
bull:<queue_name>:delayed : delay가 설정된 작업 목록을 관리하는 키(ZSET)로, 작업 ID와 함께 작업이 실행될 시점을 스케일링된 타임스탬프 형식으로 저장합니다. 지정된 시간이 도래하면 해당 작업은 wait 큐로 이동한 뒤, 워커에 의해 active 상태로 전환되어 실행됩니다.
127.0.0.1:6379> zrange bull:my-queue:delayed 0 10 withscores
1) "27"
2) "7162933838303232"
3) "28"
4) "7162933840007168"
bull:<queue_name>:<job_id> : 작업을 관리하고 추적하는 데 필요한 정보를 저장하는 키(HASH)로, 작업에 대한 이름, 데이터, 상태, 지연 시간, 재시도 횟수 등을 저장합니다.
127.0.0.1:6379> hgetall bull:my-queue:3
1) "name"
2) "foo"
3) "data"
4) "{}"
5) "opts"
6) "{\"delay\":5000,\"attempts\":10}"
7) "timestamp"
8) "1748746486914"
9) "delay"
10) "0"
11) "priority"
12) "0"
13) "processedOn"
14) "1748746491935"
15) "ats"
16) "1"
17) "atm"
18) "1"
19) "returnvalue"
20) "null"
21) "finishedOn"
22) "1748746501945"
bull:<queue_name>:<job_id>:lock : 작업이 중복 실행되지 않도록 잠금을 걸기 위해 사용하는 키(STRING)로, BullMQ는 이 키에 설정되어 있는 TTL을 주기적으로 갱신하여, 작업이 아직 실행되고 있음을 알립니다.
127.0.0.1:6379> get bull:my-queue:19:lock
"74d667ac-4727-4ddf-851c-16f8e461172d:259" # 워커를 식별하기 위한 토큰
bull:<queue_name>:active : 현재 처리 중인 작업 목록을 관리하는 키(LIST)로, 이 목록에 등록되는 작업 수는 워커의 concurrency 설정값에 따라 제한되며, 설정된 개수만큼의 작업만 동시에 실행됩니다.
127.0.0.1:6379> lrange bull:my-queue:active 0 10
1) "14"
2) "15"
작업 완료
작업이 성공적으로 처리되면 BullMQ는 해당 작업을 completed 상태로 전환하고 관련 정보를 Redis에 저장합니다.
bull:<queue_name>:completed : 완료된 작업 목록을 관리하는 키(ZSET)로, 작업 ID와 완료된 시간을 타임스탬프 형태로 저장합니다. 작업 상세 정보는 여전히 bull:<queue_name>:<job_id> 키에 남아 있으며, removeOnComplete을 true로 설정하면 완료된 작업 목록은 저장되지 않고, 작업 상세 정보도 함께 삭제됩니다.
127.0.0.1:6379> zrange bull:my-queue:completed 0 10 withscores
1) "20"
2) "1748762656607"
작업 실패
작업이 실패하면 BullMQ는 해당 작업을 failed 상태로 전환하고 관련 정보를 Redis에 저장합니다.
bull:<queue_name>:failed : 실패한 작업 목록을 관리하는 키(ZSET)로, 작업 ID와 실패한 시간을 타임스탬프 형태로 저장합니다. 작업 상세 정보는 여전히 bull:<queue_name>:<job_id> 키에 남아 있으며, removeOnFail을 true로 설정하면 실패한 작업 목록은 저장되지 않고, 작업 상세 정보도 함께 삭제됩니다.
127.0.0.1:6379> zrange bull:my-queue:failed 0 10 withscores
1) "2"
2) "1748756439785"
3) "3"
4) "1748756449792"
5) "4"
5. BullMQ - 중단된 작업에 대한 처리
stalled 상태는 워커가 작업을 정상적으로 마치지 못했을 때 발생합니다. 작업이 중단되는 원인은 다양하지만, 대표적으로 CPU 사용량이 과도해 워커가 작업을 계속 처리하지 못하는 경우나 워커 프로세스가 예기치 않게 종료된 경우 등이 있습니다.
BullMQ가 stalled 상태를 판단하는 과정은 아래와 같습니다.
- BullMQ는 bull:<queue_name>:stalled-check를 통해 마지막으로 stalled 검사를 수행한 시점을 확인하고, 일정 주기마다 검사를 수행합니다.
- 검사 시점마다 bull:<queue_name>:active 리스트에서 현재 처리 중인 작업 목록을 가져옵니다.
- 작업에 대해 bull:<queue_name>:<job_id>:lock 키의 존재 여부를 확인합니다.
- 만약 lock 키가 존재하지 않는다면, 해당 작업은 워커가 더 이상 실행하지 않고 있다고 판단되어 stalled 상태로 간주합니다.
- 이 작업은 bull:<queue_name>:active에서 제거되고, bull:<queue_name>:stalled 키에 추가됩니다.
- 이후 BullMQ는 이 stalled된 작업을 wait 상태로 변경하여 다시 실행합니다.
주의할 점은, stalled 상태의 작업은 일반적인 attempts와 무관하게 동작한다는 점입니다. Worker의 maxStalledCount가 기본값(1)인 경우, 작업이 멈추면 딱 한 번만 재시도되고 그 이후에도 실패하면 failed 상태로 바로 넘어갑니다.
{
jobId: '2',
prev: 'active',
failedReason: 'job stalled more than allowable limit'
}
'Nodejs' 카테고리의 다른 글
비동기 작업 처리를 위한 메시지 큐 연동 - Nestjs에서 BullMQ 사용하기 (0) | 2025.06.14 |
---|---|
NestJS + Swagger 사용 중 겪은 DTO 네이밍 충돌 이슈 (0) | 2025.05.29 |
NestJS - EC2에 배포하고 HTTPS 적용하기 (0) | 2025.03.09 |
NestJS - @Res() 데코레이터와 Interceptor를 함께 사용하면서 겪었던 이슈 (0) | 2025.03.05 |
슬랙으로 RSS 알림 보내기 (6) - 배포 (0) | 2025.03.03 |