What is RabbitMQ?

RabbitMQ is a popular open-source message broker that implements the AMQP (Advanced Message Queuing Protocol) standard.

Channel

A channel is a separate communication channel within a single connection. Channels allow multiple concurrent exchanges to be executed in parallel, providing a way to separate different parts of your application.

Each channel has its own set of resources, such as queues, exchanges, and bindings, which are independent of the resources used by other channels in the same connection. This allows you to manage concurrency and improve performance by isolating different parts of your application into separate channels.

For example, you might create one channel for sending messages and another channel for receiving messages, or create separate channels for different types of messages or different parts of your application. By using multiple channels, you can take advantage of the scalability and performance benefits of AMQP while also making it easier to manage your application.

Channel vs connection

In RabbitMQ, a connection represents a network connection to the RabbitMQ broker. When a client connects to RabbitMQ, it establishes a TCP connection to the broker. This connection remains open until the client explicitly closes it or the broker closes it due to a network error or a timeout. A connection is created using the AMQP protocol, and it is responsible for authentication, connection handling, and connection-level flow control.

On the other hand, a channel is a virtual connection inside a connection that allows multiple logical connections to be multiplexed over a single physical connection. When a client establishes a connection to RabbitMQ, it can create one or more channels inside that connection. Each channel is a separate AMQP session that can be used to publish or consume messages, declare queues and exchanges, and bind queues to exchanges.

The main difference between a connection and a channel is that a connection represents a physical connection to the broker, whereas a channel represents a logical connection within that physical connection. Channels allow multiple AMQP operations to be multiplexed over a single network connection, which can help reduce the overhead of establishing multiple network connections.

In summary, a connection in RabbitMQ represents a physical network connection to the broker, while a channel represents a logical connection within that physical connection, allowing multiple AMQP operations to be multiplexed over a single network connection.

Queue

Durable Queue

A queue that survives broker restarts.Queue metadata (not the messages themselves, unless explicitly persisted) is stored on disk. Messages in the queue can be persisted if they are marked as persistent.

Transient Queues

A transient queue refers to a queue that is not durable, meaning it is not persistent and will not survive a broker restart. Transient queues are typically used for short-lived or temporary messaging scenarios, where persistence and reliability are not critical. which has performance higher compared to normal queue

Exclusive Queue

A queue that is private to the connection that declared it and is deleted when the connection closes.

Lazy Queue

A queue designed to handle large volumes of messages by storing them on disk instead of memory.

  • Created with a x-queue-mode argument set to lazy.
  • Messages are moved to disk as soon as possible.
  • Helps prevent memory overload in scenarios with high message backlogs.

Quorum Queue

A replicated queue type that uses the Raft consensus algorithm for high availability and consistency.

  • Messages and metadata are replicated across multiple nodes in a cluster.
  • Provides stronger data safety guarantees than classic mirrored queues.
  • Supports sharding for large-scale message handling.
Queue TypePersistentReplicatedSpecial Feature
Durable QueueYesNoSurvives restarts
Transient QueueNoNoLost after restart
Exclusive QueueOptionalNoTied to a single connection
Auto-Delete QueueOptionalNoDeleted when last consumer disconnects
Temporary QueueOptionalNoAuto-generated, often exclusive
Lazy QueueYesNoStores messages on disk to save memory
Quorum QueueYesYesHigh availability using Raft
Classic Mirrored QueueYesYesLegacy replication
Dead Letter QueueOptionalOptionalFor undeliverable messages
Header QueueOptionalOptionalMatches on message headers

Exchanges

In RabbitMQ, an exchange is a message routing agent that receives messages from producers and routes them to queues based on message properties such as the routing key. When a producer sends a message to an exchange, it is up to the exchange to route the message to one or more queues.

There are four types of exchanges in RabbitMQ:

  1. Direct exchange: Messages are routed to queues based on the exact match between the routing key of the message and the routing key of the queue.

  2. Fanout exchange: Messages are routed to all the queues bound to the exchange. It ignores the routing key and sends messages to all the queues that are bound to the exchange.

  3. Topic exchange: Messages are routed to queues based on pattern matching between the routing key of the message and the routing key of the queue. It uses wildcards to match the routing key.

  4. Headers exchange: Messages are routed to queues based on header values instead of routing keys. It is rarely used, and its functionality is similar to the topic exchange.

  5. x-delayed-message → is a custom exchange type in RabbitMQ that allows you to delay messages before they are delivered to a queue. This exchange type is not included in RabbitMQ by default and must be installed as a plugin.

    When you create an x-delayed-message exchange, you can set a delay time for messages using the x-delay header. The exchange will hold the message for the specified delay time and then deliver it to the appropriate queue. This is useful in scenarios where you want to delay the delivery of a message until a certain time or after a certain event has occurred.

Example

const amqp = require('amqplib');
 
async function setup() {
  // Connect to RabbitMQ
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
 
  // Install the delayed message exchange plugin (if necessary)
  await channel.assertExchange('amq.delayed', 'x-delayed-message', {
    durable: true,
    arguments: {
      'x-delayed-type': 'direct'
    }
  });
 
  // Create the queue and bind it to the delayed message exchange
  const queueName = 'my-queue';
  const routingKey = 'my-routing-key';
 
  await channel.assertQueue(queueName, { durable: true });
 
  await channel.bindQueue(queueName, 'amq.delayed', routingKey);
 
  // Consumer function to handle incoming messages
  const consumerFunction = (msg) => {
    console.log(`Received message: ${msg.content.toString()}`);
    channel.ack(msg);
  };
 
  // Consume messages from the queue
  channel.consume(queueName, consumerFunction);
 
  // Producer function to send messages with a delay
  const producerFunction = async (message, delayMs) => {
    const headers = { 'x-delay': delayMs };
    const buffer = Buffer.from(message);
 
    await channel.publish('amq.delayed', routingKey, buffer, { headers });
 
    console.log(`Sent message "${message}" with delay of ${delayMs}ms`);
  };
 
  // Send some messages with a delay
  await producerFunction('Hello World!', 5000);
  await producerFunction('Delayed message!', 10000);
}
 
setup();

Application of Exchange

Let’s say you have a distributed system that consists of multiple microservices. Each microservice has its own queue, and they communicate with each other through messaging.

When a microservice wants to send a message to another microservice, it can publish the message to an exchange instead of directly sending it to the other microservice’s queue. The exchange is responsible for routing the message to the appropriate queue based on the message’s routing key.

For example, let’s say you have a microservice that handles user authentication, and another microservice that handles user orders. When a user logs in, the authentication microservice can publish a message to the exchange with a routing key of “user.login”. The exchange can then route this message to the order microservice’s queue, which is bound to the exchange with a matching routing key.

If you didn’t use an exchange, the authentication microservice would have to know the exact name of the order microservice’s queue and send the message directly to that queue. This would tightly couple the microservices and make it harder to make changes to the system in the future.

By using an exchange, you can create a loosely coupled system where microservices don’t need to know about each other’s queues. This makes it easier to add new microservices or change the routing rules of messages without affecting other parts of the system.

Durablity

RabbitMQ provides durability by persisting messages and metadata to disk. This ensures that messages are not lost in case of a server failure.if it durable to true it will presist if we set false it will not until it reach threshold

Prefetch

RabbitMQ uses prefetch to control the amount of messages a consumer can consume at once. Prefetch specifies the number of unacknowledged messages that can be in-flight before the broker stops delivering more messages to the consumer. This avoids overloading a consumer with too many messages at once.

Amqp-connection-manager vs Amqplib

amqp-connection-manager and amqplib are both Node.js libraries for working with RabbitMQ, but they have different purposes and use cases.

amqplib is a low-level RabbitMQ client library that provides a thin wrapper around the RabbitMQ API. It allows you to send and receive messages, create and manage exchanges and queues, and interact with other RabbitMQ features. amqplib provides a direct and flexible interface to RabbitMQ, and is a good choice if you need complete control over your RabbitMQ interactions.

amqp-connection-manager, on the other hand, is a higher-level library that provides connection management and channel pooling. It uses amqplib under the hood, but adds features like connection retry, connection throttling, and automatic channel recovery. amqp-connection-manager is a good choice if you want to simplify your RabbitMQ code and reduce the chance of connection errors, or if you need to handle multiple connections and channels.

In general, if you need fine-grained control over your RabbitMQ interactions, or if you have a small number of connections and channels, amqplib is a good choice. If you have a large number of connections and channels, or if you want to simplify your RabbitMQ code and reduce the chance of connection errors, amqp-connection-manager is a better choice.

What are the best practice regarding channel and connection

  1. Use a single connection per application instance: It’s a good practice to use a single connection for an entire application instance. Creating multiple connections can lead to resource wastage, and can make it difficult to manage and monitor connections.
  2. Use a connection pool: Creating and closing connections can be expensive, so it’s recommended to use a connection pool. Connection pools can be used to manage the number of connections and can help improve performance.
  3. Use a separate channel for each thread: When creating a multithreaded application, use a separate channel for each thread instead of sharing a single channel. Sharing a single channel between threads can lead to contention issues and can cause the application to become unstable.
  4. Close channels when they’re no longer needed: It’s a good practice to close channels when they’re no longer needed. This helps to reduce the number of open channels and frees up resources.
  5. Use a lightweight protocol: RabbitMQ provides AMQP, which is a lightweight protocol that is designed for message queuing. Using a lightweight protocol can help improve performance and reduce the overhead of managing connections and channels.
  6. Use transactional channels: When sending multiple messages, it’s a good practice to use transactional channels. This ensures that all messages are either sent successfully or not sent at all.
  7. Use a connection heartbeat: RabbitMQ provides a connection heartbeat mechanism that can be used to detect network failures. It’s a good practice to use connection heartbeat to ensure that the application can recover from network failures.
  8. Use connection and channel events: RabbitMQ provides connection and channel events that can be used to monitor and manage connections and channels. Using these events can help improve the reliability and performance of the application.

How to Handle failures

If we want to retry if the consumer get failed when processing the data we can have 2 way 1. nack 2. reject

Difference between them

The main difference between nack (negative acknowledgement) and reject in RabbitMQ is how they handle message rejection and requeuing:

  1. nack (channel.nack):
    • nack is used to negatively acknowledge a message and reject it.
    • It allows you to control whether the message should be requeued or discarded.
    • The method signature is channel.nack(message, allUpTo, requeue).
    • The message parameter represents the message being rejected.
    • The allUpTo parameter is a boolean that indicates whether all unacknowledged messages prior to the given message should also be rejected.
    • The requeue parameter is a boolean that determines whether the rejected message should be requeued or discarded.
    • With channel.nack, you have more control over requeuing behavior and can choose whether to discard the message or requeue it for retry.
  2. reject (channel.reject):
    • reject is used to reject a message without acknowledging it.
    • When a message is rejected using reject, it can optionally be requeued based on the requeue parameter.
    • The method signature is channel.reject(message, requeue).
    • The message parameter represents the message being rejected.
    • The requeue parameter is a boolean that determines whether the rejected message should be requeued or discarded.
    • By default, if requeue is set to true, the message will be requeued for future delivery. If set to false, the message will be discarded.

In summary, nack provides more flexibility by allowing you to explicitly control requeuing behavior (requeue or discard), whereas reject gives you the option to requeue the message based on the requeue parameter. Both methods can be used to handle message rejection and retry scenarios in RabbitMQ, depending on your specific requirements.

Rabbitmq simulator tool to playaround with it

Unack msg

In RabbitMQ, messages are marked as unacknowledged (unack) when a consumer receives a message but hasn’t sent an acknowledgment back to RabbitMQ yet. Unacknowledged messages remain in the queue and are not re-delivered to other consumers until they are either acknowledged or the connection with the consumer is closed.

Plugin

Shovel Plugin  RabbitMQ plugin that unidirectionally moves messages from a source to a destination. rabbitmq

Internal

Protocol : AMQP

  • AMQP is a wire-level protocol to standardize message queueing and messaging between clients and servers.
  • It ensures interoperability: any client that speaks AMQP can talk to any broker that speaks AMQP.
  • It defines both:
    • A messaging model (how queues, exchanges, bindings work).
    • A binary protocol (how actual bytes are sent over the wire).

. Connections and Channels

  • A TCP connection is established between client and server.
  • Over one connection, multiple channels can be multiplexed (lightweight virtual connections).
  • Each operation (send/receive) happens on a channel, not directly on a connection.
  • Helps in performance, thread-safety, and firewall friendliness.

2. Framing

Data is sent as frames:

  • Frame structure:
    [Frame header][Payload][Frame end]
  • Types of Frames:
    • Method frame: “commands” (e.g., publish a message).
    • Content header frame: metadata about the message.
    • Content body frame: actual message payload.
    • Heartbeat frame: keep connection alive.

3. Commands and Classes

  • The protocol is designed as commands grouped into classes.
  • Example classes: Connection, Channel, Exchange, Queue, Basic, Transaction.
  • Each class has methods: e.g., Basic.Publish, Basic.Consume, Queue.Declare

Structure of a Frame

0        1        3         7             (size+7)    (size+8)
+--------+--------+---------+-------------+----------------+
| type   |channel | payload size |   payload   | frame-end |
+--------+--------+---------+-------------+----------------+

FieldSizeDescription
Type1 byteWhat kind of frame? (method, header, body, heartbeat)
Channel2 bytesChannel number (so we know which “virtual connection” it belongs to)
Payload Size4 bytesHow many bytes of actual data (payload)?
PayloadN bytesDepends on frame type (method, header, content, etc.)
Frame End1 byteAlways 0xCE (206 decimal) - special marker for end of frame
  • Method frame Contains commands (e.g., publish, consume, ack)
  • Header frame Contains metadata about the message (e.g., properties, con)
  • Heartbeat Frame Special lightweight frame to check connection health

When a producer sends a message, 3 frames are sent in a strict sequence:

  1. Method Frame (Basic.Publish) → “Hey broker, I’m sending a message to Exchange X”
  2. Header Frame → “The message properties and size are…”
  3. Body Frame(s) → “Here’s the actual message content (maybe split into multiple frames if big)“

Heartbeats

  • Detect dead connections faster than TCP/IP timeouts.

  • Avoid keeping ghost connections when a client or server crashes silently.

  • Maintain activity through firewalls or NATs (which often kill idle TCP connections).

  • During connection negotiation, client and server agree on a heartbeat interval (like, say, every 10 seconds).

  • If no data (method/content frames) has been sent within the heartbeat interval:

    • Each peer MUST send a heartbeat frame (frame type 8).
  • If a peer doesn’t receive any frame for a certain multiple of heartbeat time, it can assume the other side is dead and close the connection.

  • Any frame (not just heartbeat) also resets the timer

ACK and UnAck

It uses delivery Tag and Delivery-tag is scoped per channel. (Not per connection!)

  1. Client sends: Basic.Consume(queue=“taskqueue”, autoAck=false)
  2. RabbitMQ delivers a message:
    • Payload = “Process Order #123”
    • Delivery-tag = 57
  3. Client processes message.
  4. Client sends back Basic.Ack(delivery-tag=57).
  5. RabbitMQ marks message #57 as “acknowledged” and deletes it from memory/disk.

If instead: 4. Client sends Basic.Reject(delivery-tag=57, requeue=true), 5. RabbitMQ will requeue the message back into the queue.

  • Each channel (session) has a delivery-tag counter.
  • Each unacked message is stored in an Unacked Message Map, keyed by delivery-tag.
  • When ACK comes:
    • Lookup delivery-tag in the map,
    • If found, remove the entry,
    • If not found (e.g., double-ack or wrong ack), protocol error.
  • When connection/channel closes:
    • All unacked messages are either requeued (default) or dropped (depending on settings).

When connection get closed

  • RabbitMQ detected a protocol-level error (like:
    • Double ack
    • Nack on wrong tag
    • Bad frame sequence
    • Invalid operation on channel)
  • RabbitMQ force-closes the channel (protocol rule).

Blog

  1. how does elrang does scheduling

Resources

  1. https://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html
  2. https://www.cloudamqp.com/blog/part2-rabbitmq-best-practice-for-high-performance.html
  3. What we learned form running 1k queue Cloudamp
    1. Keep connection and channle as low as possible
    2. seprate connection for pub and consume4
    3. dont have too large queue 10k msg
    4. Enable lazy queue (loaded on ram when needed)
    5. rabbitmq sharding
    6. limited use on priroity queue
    7. adjust prefetch (cloudampq default 1k )
  4. Explain basic and how to handle error case
  5. https://www.cloudamqp.com/blog/when-to-use-rabbitmq-or-apache-kafka.html
  6. https://youtu.be/HzPOQsMWrGQ

https://github.com/sensu-plugins/sensu-plugins-rabbitmq

Frame 1258: 157 bytes on wire (1256 bits), 157 bytes captured (1256 bits)

Encapsulation type: Ethernet (1)

Arrival Time: Apr 23, 2025 18:23:49.478206000 IST

[Time shift for this packet: 0.000000000 seconds]

Epoch Time: 1745412829.478206000 seconds

[Time delta from previous captured frame: 0.000581000 seconds]

[Time delta from previous displayed frame: 0.000581000 seconds]

[Time since reference or first frame: 11.195346000 seconds]

Frame Number: 1258

Frame Length: 157 bytes (1256 bits)

Capture Length: 157 bytes (1256 bits)

[Frame is marked: False]

[Frame is ignored: False]

[Protocols in frame: eth:ethertype:ip:tcp:amqp]

[Coloring Rule Name: TCP]

[Coloring Rule String: tcp]

Ethernet II, Src: aa:aa:aa:aa:aa:aa (aa:aa:aa:aa:aa:aa), Dst: 12:e4:35:52:51:8a (12:e4:35:52:51:8a)

Internet Protocol Version 4, Src: 172.20.35.39, Dst:

Transmission Control Protocol, Src Port: 43396, Dst Port: 5672, Seq: 554, Ack: 680, Len: 91

Advanced Message Queueing Protocol

Type: Method (1)

Channel: 6

Length: 5

Class: Channel (20)

Method: Open (10)

Arguments

Advanced Message Queueing Protocol

Type: Method (1)

Channel: 7

Length: 5

Class: Channel (20)

Method: Open (10)

Arguments

Advanced Message Queueing Protocol

Type: Method (1)

Channel: 8

Length: 5

Class: Channel (20)

Method: Open (10)

Arguments

Advanced Message Queueing Protocol

Type: Method (1)

Channel: 9

Length: 5

Class: Channel (20)

Method: Open (10)

Arguments

Advanced Message Queueing Protocol

Type: Method (1)

Channel: 10

Length: 5

Class: Channel (20)

Method: Open (10)

Arguments

Advanced Message Queueing Protocol

Type: Method (1)

Channel: 11

Length: 5

Class: Channel (20)

Method: Open (10)

Arguments

Advanced Message Queueing Protocol

Type: Method (1)

Channel: 12

Length: 5

Class: Channel (20)

Method: Open (10)

Arguments