[시리즈 2편] 실무로 배우는 메시지 큐 - RabbitMQ
들어가며
[시리즈1]에서는 프로세스 내부 메시지 큐를 다뤘습니다. 이번엔 네트워크 메시지 큐인 RabbitMQ를 다룹니다.
RabbitMQ 공식 문서나 기술 블로그는 많지만, 실무에서 어떻게 사용하는지에 대한 글은 의외로 적습니다.
"Producer가 뭐고 Consumer가 뭔지는 알겠는데, 그래서 실제로는 어떻게 쓰는데?"
이번 글에서는 우리 MES 시스템에서 RabbitMQ를 어떻게 활용하고 있는지 실제 코드와 함께 공유합니다.
우리 시스템의 구성
MES 시스템은 관리자 PC 1대와 현장 Worker PC 수십 대가 연동됩니다. 관리자가 생산지시를 등록하면, 현장의 모든 Worker PC에 즉시 알림이 전달되어야 합니다.
현재 사용 중인 방식: RabbitMQ
RabbitMQ 아키텍처
┌───────────────────────────────────────────────────────────┐
│ RabbitMQ Server │
│ │
│ ┌─────────┐ ┌──────────┐ ┌───────┐ ┌──────────┐ │
│ │Producer │───→│ Exchange │───→│ Queue │───→│ Consumer │ │
│ │(서버) │ │(라우팅) │ │(대기열)│ │(클라이언트)││
│ └─────────┘ └──────────┘ └───────┘ └──────────┘ │
│ │
└───────────────────────────────────────────────────────────┘
Producer: Spring Boot API 서버
Exchange: 라우팅 규칙에 따라 메시지 전달
Queue: 각 Worker PC별 메시지 대기열 (ALARM_QUE_WPC001)
Consumer: C# Worker PC 클라이언트
실제 동작 방식
관리자가 생산지시 등록
│
└─→ RabbitMQ에 메시지 던짐 (끝!)
│
├─→ Worker PC 1 (즉시 전달)
├─→ Worker PC 2 (네트워크 복구 시 자동 전달)
└─→ Worker PC 3 (즉시 전달)
서버는 메시지 전송 후 바로 다음 작업 처리 가능
장점:
- 서버는 RabbitMQ에만 메시지 전달 (블로킹 없음)
- 네트워크 장애 시 자동 재시도
- Worker PC 증설 시 코드 수정 불필요
시스템 구조
MES 알림 시스템 전체 흐름
┌─────────────────┐
│ 관리자 PC │ 1. 생산지시 등록
│ │
└────────┬────────┘
│
▼
┌─────────────────┐ ┌─────────────────┐
│ Spring Boot │──────→│ RabbitMQ │ 2. 메시지 발송
│ API 서버 │ 2-1 │ 서버 │
└─────────────────┘ └────────┬────────┘
│ 3. 라우팅
┌─────────────────────────┼─────────────────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Worker PC 1 │ │ Worker PC 2 │ │ Worker PC 3 │
│ (설비 A) │ │ (설비 B) │ │ (설비 C) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
4. 알림 팝업 표시 4. 알림 팝업 표시 4. 알림 팝업 표시
5. 데이터 새로고침 5. 데이터 새로고침 5. 데이터 새로고침
메시지 타입 정의
// 실시간 알림 메시지 타입
public enum MqMessageType
{
WORKORDER_ADDED, // 생산지시 추가
WORKORDER_DELETED, // 생산지시 삭제
...
}
코드로 보는 실제 구현
서버 측 (Spring Boot)
1. 설정 (application.yml)
spring:
rabbitmq:
host: 192.xxx.xxx.xxx
port: 5672
username: xxx
password: xxx
alarmmq:
exchange: alarm.topic # Topic Exchange
queue: ALARM_QUE_ # 큐 접두사
2. 메시지 발송 API
@RestController
@RequestMapping("/api")
public class QueueController {
private final RabbitTemplate rabbitTemplate;
@Value("${spring.alarmmq.exchange}")
private String EXCHANGE_NAME;
@PostMapping("/messagePub")
public CommonResult MessagePub(@RequestBody AlarmInfo[] alarmInfo) {
try {
// 1. 수신자 목록 추출
HashSet<String> userNoSet = new HashSet<>();
for (AlarmInfo alarm : alarmInfo) {
userNoSet.add(alarm.getUserNo());
}
// 2. 사용자별 큐 생성 및 바인딩
for (String userNo : userNoSet) {
Queue queue = new Queue("ALARM_QUE_" + userNo, true);
amqpAdmin.declareQueue(queue);
TopicExchange exchange = new TopicExchange(EXCHANGE_NAME);
amqpAdmin.declareBinding(
BindingBuilder.bind(queue).to(exchange).with(userNo + ".#")
);
}
// 3. 메시지 발송
for (AlarmInfo alarm : alarmInfo) {
rabbitTemplate.convertAndSend(
EXCHANGE_NAME, // Exchange
alarm.getUserNo(), // 라우팅 키
alarm // 메시지
);
}
return responseService.getSuccessResult();
} catch (Exception e) {
return responseService.getFailResult("MQE001", "메시지 전송 실패");
}
}
}
클라이언트 측 (C# WinForms)
1. MQ 클라이언트 초기화
public class BackgroundForm : BackgroundFormUI
{
private MqClient mqClient;
private void InitializeMqClient()
{
// 큐 이름: workerPc.WPC001
string queue = "workerPc." + ConnectionConfig.WorkerPcId;
mqClient = new MqClient(queue);
mqClient.MessageQueueReceived += MqClient_MessageQueueReceived;
if (mqClient.IsOpen)
ShowNotification("MQ 서버 연결 성공", NotificationType.Success);
else
ShowNotification("MQ 서버 연결 실패", NotificationType.Error);
}
}
2. 메시지 수신 처리
private void MqClient_MessageQueueReceived(MqMessageModel mqMessage)
{
string message = string.Empty;
switch (mqMessage.MqMessage)
{
case MqMessageType.WORKORDER_ADDED:
case MqMessageType.WORKORDER_DELETED:
message = "생산지시 정보가 업데이트 되었습니다.";
WorkorderUpdateRequired?.Invoke(mqMessage.MchId, null);
break;
...
}
// UI 스레드에서 알림 표시
this.BeginInvoke(new MethodInvoker(delegate
{
TrayNotification frm = new TrayNotification();
frm.showAlert(message, TrayNotification.enmType.Info);
}));
}
메시지 흐름 (시퀀스 다이어그램)
┌────────┐ ┌─────────┐ ┌─────────┐ ┌──────────┐ ┌──────────┐
│ 관리자 │ │ API │ │RabbitMQ │ │ Queue │ │ Worker │
│ PC │ │ Server │ │ Server │ │ │ │ PC │
└───┬────┘ └────┬────┘ └────┬────┘ └────┬─────┘ └────┬─────┘
│ │ │ │ │
│생산지시 등록 │ │ │ │
│────────────>│ │ │ │
│ │ │ │ │
│ │DB 저장 │ │ │
│ │ │ │ │
│ │메시지 발송 │ │ │
│ │─────────────>│ │ │
│ │ │ │ │
│ │ │큐에 저장 │ │
│ │ │────────────>│ │
│ │ │ │ │
│ │ │ │메시지 전달 │
│ │ │ │─────────────>│
│ │ │ │ │
│ │ │ │ 알림 표시 │
│ │ │ │ 데이터 갱신 │
│ │ │ │ │
소요 시간: 메시지 발송부터 클라이언트 수신까지 평균 50~100ms
Exchange 타입과 라우팅
Topic Exchange (현재 시스템)
라우팅 패턴: "사용자번호.#"
┌─────────────────┐
│ Topic Exchange │
│ "alarm.topic" │
└────────┬────────┘
│ 라우팅 키: "WPC001"
│
┌────┴────┬─────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Queue 1 │ │ Queue 2 │ │ Queue 3 │
│ WPC001 │ │ WPC002 │ │ WPC003 │
│ .# │ │ .# │ │ .# │
└─────────┘ └─────────┘ └─────────┘
✅ 전달 ❌ 불일치 ❌ 불일치
Exchange 타입 비교
| 타입 | 라우팅 방식 | 사용 예시 |
|---|---|---|
| Direct | 정확히 일치 | 특정 PC 1대 |
| Topic | 패턴 매칭 (*, #) | 부서별, 권한별 |
| Fanout | 브로드캐스트 | 전체 공지 |
마치며
Windows 메시지 큐 vs RabbitMQ
| 항목 | Windows 메시지 큐 | RabbitMQ |
|---|---|---|
| 범위 | 단일 프로세스 | 네트워크 전체 |
| 용도 | UI 이벤트 | 시스템 간 통신 |
| 영속성 | 없음 | 있음 (서버 재시작 후에도 유지) |
| 라우팅 | 없음 | Exchange로 유연한 분배 |
핵심 포인트
"서버와 클라이언트를 느슨하게 연결하라"
- 직접 호출: 강한 결합 (한 대 장애 = 전체 영향)
- RabbitMQ: 느슨한 결합 (메시지만 던지고 끝)
이 패턴은 Kafka, Redis Pub/Sub 등 다른 메시징 시스템에도 동일하게 적용됩니다.
이 글이 도움이 되었으면
| 기술블로그에서 자주 보는 글 | 현실의 우리 |
|---|---|
| "우리가 Kafka, RabbitMQ를 선택한 이유" | 입사했더니 이미 깔려있음 |
| "초당 100만 TPS 처리기" | MES는 하루에 수백~몇천 건이면 많은 거 |
| "장애 대응 포스트모템" | 장애나면 블로그 쓸 시간에 고쳐야 함 |
이 글은 레거시 MES 시스템을 하자보수하면서 배운 내용입니다.
화려한 신규 프로젝트가 아니어도, 누군가 만들어둔 시스템을 이해하고 문제를 해결하는 과정에서 배우는 것도 많습니다.
메시지 큐에 대한 개념 글이나 아키텍처 다이어그램은 많지만, 정작 "실무에서 어떻게 쓰는데?" 에 대한 답은 의외로 찾기 어려웠습니다. 비슷한 궁금증을 가졌던 분들께 이 글이 조금이나마 도움이 되었으면 합니다.
감사합니다.