Working with Acknowledgements and Consumers
Share:
In any distributed system, acknowledging that a message has been processed and can be safely removed from the queue plays a very critical role. It helps in ensuring that no message is lost and every bit of data gets processed. In RabbitMQ, this principle is applied via acknowledgement messages or (Acks). In this chapter, we'll delve into acknowledging messages, requeueing messages, and how to effectively work with consumers. Let's picture a mock movie booking system to facilitate understanding.
Understanding Acknowledgements
Acknowledging messages in RabbitMQ means that a message from the queue has been processed and it can be safely deleted from the queue. By default, when a consumer connects to the queue to consume a message, once RabbitMQ delivers the message to the consumer it immediately marks it for deletion.
Consider the following scenario. We have a queue named 'movie_tickets' which is used to process movie ticket booking requests. Our consumer is a booking processor named 'Alice'.
# Establish connection and channel
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = conn.channel()
# Declare queue
channel.queue_declare(queue='movie_tickets')
# Define a callback to process messages
def process_ticket(ch, method, properties, body):
print(f'Received {body}')
# Set callback function for the consumer
channel.basic_consume(queue='movie_tickets',
auto_ack=True, # enable auto acknowledgement
on_message_callback=process_ticket)
# Start consuming messages
print('Waiting for messages...')
channel.start_consuming()
In the above script, we use the basic_consume
method to consume messages from the 'movie_tickets' queue. The auto_ack=True
parameter means that the messages are automatically acknowledged once they have been delivered.
However, assuming that 'Alice' crashes midway through processing, the ticket booking would then be lost completely because the message was already marked for deletion by RabbitMQ. To make sure that the unprocessed messages are not lost, we should turn off automatic acknowledgments, like so:
# Set callback function for the consumer with no auto acknowledgement
channel.basic_consume(queue='movie_tickets',
auto_ack=False, # disable auto acknowledgement
on_message_callback=process_ticket)
When the auto_ack
parameter is set to False
, it means the consumer has to manually send an acknowledgement message to RabbitMQ once it has done processing. This is done via the basic_ack
method:
def process_ticket(ch, method, properties, body):
print(f'Received {body}')
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(queue='movie_tickets',
auto_ack=False, # disable auto acknowledgement
on_message_callback=process_ticket)
Requeueing Messages
In some cases, a consumer may fail to process a message. In these situations, RabbitMQ allows for messages to be requeued so that they can be tried again. The basic_reject
function is used to reject a message, and it takes delivery_tag
and a requeue
Boolean. When requeue
is True
, the message will be put back in the queue.
def process_ticket(ch, method, properties, body):
print(f'Received {body}')
# If processing fails, requeue the message
ch.basic_reject(delivery_tag = method.delivery_tag, requeue = True)
Managing Consumers
In a real-world application, we may have multiple consumers working on the same queue. RabbitMQ gives you a way to equally distribute messages to multiple consumers. This could be in a round-robin manner or based on the number of unacknowledged messages from each consumer.
Let's imagine two booking processors 'Alice' and 'Bob' consuming from the 'movie_tickets' queue. RabbitMQ handles this automatically:
# 'movie_tickets' consumer 'Alice'
channel.basic_consume(queue='movie_tickets',
auto_ack=False,
on_message_callback=process_ticket)
# 'movie_tickets' consumer 'Bob'
channel.basic_consume(queue='movie_tickets',
auto_ack=False,
on_message_callback=process_ticket)
print('Waiting for messages...')
channel.start_consuming()
If we want to distribute the messages based on the number of unacknowledged messages from each consumer, we can use the basic_qos
function with the prefetch_count=1
parameter. This tells RabbitMQ not to give more than one message to a worker at a time.
# Assign work on the basis of acks pending
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='movie_tickets',
auto_ack=False,
on_message_callback=process_ticket)
print('Waiting for messages...')
channel.start_consuming()
In conclusion, the mechanisms of acknowledgements, requeueing, and managing consumers in RabbitMQ are critical in ensuring that messages are handled properly and efficiently in a distributed system. By understanding and utilizing these features, a more robust and reliable application can be built.
0 Comment
Sign up or Log in to leave a comment