RabbitMQ Work Queues
AMQP 오픈소스 메시지 브로커 다른 MQ 오픈소스보다 오래되었고 다양한 플러그인, 언어로 기능들을 제공합니다.
Advanced Message Queueing Protocol 줄임말로 MQ 오픈소스에 기반한 표준 프로토콜
Quick Start
기본 포트
어드민 포트 : 15672
브로커 포트 : 5672
도커로 시작하기
docker run -d --hostname rabbit --name rabbit -p5672:5672 -p 15672:15672 rabbitmq:3-management
브로커 서버가 죽었을 떄
- 브로커 서버가 죽으면 모든 큐가 다 날아감
- Sub하는 서버는 연결이 끊겨도 일단 계속 재접속을 시도함 -> 다시 브로커 서버가 살아나면 해당 queue 이름으로 listen -> 브로커 서버 매니지 페이지에 바로 queue가 올라옴
메시지 송신과 수신
- 메시지 수신은 socket 서버와 같이 터널이 계속 열려있음
- 송신을 하면 바로 큐에 쌓이며 송신한 이름과 수신한 이름이 브로커에서 같다고 판단되면 바로 수신한테 넘김
- 큐에 송신한 후 수신 서버가 꺼져있다면 큐는 날아가지 않고 대기함
- 나중에 수신 서버가 다시 켜지면 한번에 해당 큐 이름을 가지고 있는 곳으로 다 날아가서 수신됨
만약 ack을 보내지 않으며 consumer가 죽는다면 (채널 닫히거나 연결이 끊기거나 TCP 연결이 끊긴다면) RabbitMQ는 메시지가 완전히 처리 되지 않았다는것을 이해하고 다시 큐에 적재할 것입니다.
만약 같은 시각에 다른 consumer들이 온라인 상태라면 빠르게 다른 consumer에게 재전달해줄것입니다.
consumer가 못 받는 상태라 생각할지라도 강제로 보내버리는 timeout (기본 30분)을 설정할 수 있습니다.
메시지가 손실되지 않는 것은 큰 이점이지만 만약 유저가 메시지를 수신 받기를 원해 버튼을 누르고 수신을 받기전에 꺼버린다면 큐 안에 다시 쌓이게 되어 메모리를 많이 잡아 먹을 것입니다. 이런것들을 디버그 하기 위해 아래와 같은 명령어를 사용합니다.
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
CLI 결과
GUI에서 보면
boolean autoAck = false; 을 사용하면 consumer가 메시지를 처리 중에 강제 종료 되더라도 손실되는 것은 없고 처리하지 못 한 모든 메시지들은 consumer로 재전달될것입니다.
Message Durability
하지만 모든 것들은 RabbitMQ 서버가 죽으면 소용이 없습니다. 메시지가 유실되지 않게하기 위해서는 두가지가 필요합니다.
queue와 메시지에 내구성이 있다고 표시하는 것입니다.
하지만 같은 queue 이름이 브로커 내에 이미 존재하고 있다면 재정의할 수 없습니다.
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
이제 task_queue의 정의 자체는 브로커가 재시작되어도 유실되지 않을것입니다. 메시지도 persistent하게 표시해줘야합니다.
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
이제 큐에 담긴 메시지들도 consumer가 받지 못했다면 브로커가 재시작되어도 유실되지 않고 살아있습니다.
내구성 있는 메시징 보내보기
코드를 작성하고 내구성 표식을 한 메시지를 송신합니다. 이때 MQ 서버는 아직 살아있습니다.
어드민에서 확인해보면 Features에 D 표시가 나오고 메시지의 갯수가 나옵니다. 이제 MQ 서버를 내립니다.
다시 MQ를 키고 Consumer 자바를 실행합니다. 그럼 MQ가 죽으면서 죽었을 메시지가 받아집니다.
Fair dispatch
브로커는 consumer의 처리 속도와 상관없이 큐를 균등하게 보내주고 있습니다.
consumer의 처리 속도에 따라 여러 consumer들에게 메시지를 나눠주고 싶다면 basicQos의 prefetchCount를 1로 설정합니다.
int prefetchCount = 1;
channel.basicQos(prefetchCount);
일반 rabbitmqtt hello world 예제
https://www.rabbitmq.com/tutorials/tutorial-one-java
워크 큐 예제