Use quorum queues in RabbitMQ with ruby client

RabbitMQ has the new queue type called "quorum queue", which is much different from classic queues. Please read docs below.

RabbitMQ Quorum Queues

RabbitMQ Quorum Queues explained

Quorum queues come with a few special features and restrictions. They can not be non-durable, because the Raft log is always written to disk, so they can never be declared as transient. They also do not support message TTLs and message priorities.

As the use case for Quorum queues is data safety, they also cannot be declared as exclusive, which would mean they get deleted as soon as the consumer disconnects.

Since all messages in Quorum queues are persistent, the AMQP “delivery-mode” option has no effect on their operation.

How to use quorum queue? Here I give a sample by using the ruby client "bunny".

First we should have rabbitmq installed on the system. For instance, mime is ubuntu 20.04 OS which has rabbitmq 3.11.5 installed.

 OS PID: 245727
OS: Linux
Uptime (seconds): 86657
Is under maintenance?: false
RabbitMQ version: 3.11.5
RabbitMQ release series support status: supported
Node name: rabbit@nxacloud-bloghost
Erlang configuration: Erlang/OTP 25 [erts-13.1.3] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [jit:ns]
Crypto library: OpenSSL 1.1.1f 31 Mar 2020
Erlang processes: 402 used, 1048576 limit
Scheduler run queue: 1
Cluster heartbeat timeout (net_ticktime): 60

Next, since I was using the bunny client, I have to install it via gem.

 $ gem install bunny

And this is my ruby version,

 $ ruby -v
ruby 3.1.3p185 (2022-11-24 revision 1a6b16756e) [x86_64-linux]

Write the consumer program as follows.

 require "bunny"

conn = Bunny.new
conn.start

ch = conn.create_channel
x = ch.direct("mydirect")
q = ch.queue("directqueue", durable:true, arguments: {"x-queue-type": "quorum"}).bind(x, :routing_key => "test")

q.subscribe(:block => true, manual_ack: true) do |delivery_info, metadata, payload|
puts "Received #{payload}"
ch.ack(delivery_info.delivery_tag)
end

Please note the key points for consumer include,

Now run this consumer in terminal. And open another terminal to write the producer program as follows.

 require "bunny"

conn = Bunny.new
conn.start

ch = conn.create_channel
ch.confirm_select
x = ch.direct("mydirect")

20.times do |s|
x.publish(s.to_s, :routing_key => 'test')
end

ch.wait_for_confirms

Now run this producer, which published 20 messages which are just increasing numbers to rabbitmq. These messages will be consumed by consumer.

Go back to the terminal of consumer, you will see the output,

 Received 0
Received 1
...

They are working!

Finally you can run this command to see the options of that quorum queue.

 $ sudo rabbitmq-queues quorum_status directqueue
Status of quorum queue directqueue on node rabbit@nxacloud-bloghost ...
┌──────────────────────────┬────────────┬───────────┬──────────────┬────────────────┬──────┬─────────────────┐
│ Node Name │ Raft State │ Log Index │ Commit Index │ Snapshot Index │ Term │ Machine Version │
├──────────────────────────┼────────────┼───────────┼──────────────┼────────────────┼──────┼─────────────────┤
│ rabbit@nxacloud-bloghost │ leader │ 196 │ 196 │ undefined │ 1 │ 3 │
└──────────────────────────┴────────────┴───────────┴──────────────┴────────────────┴──────┴─────────────────┘