Queue Use Cases

Publisher

A Publisher (producer) is responsible for sending messages to a queue. In Uzumibi, you use the Uzumibi::Queue.send method to send messages.

Typical use cases include:

  • Asynchronous background processing: Return a response to the user immediately while delegating time-consuming tasks (email sending, image conversion, data aggregation, etc.) to the queue.
  • Event notification: Notify another Worker of events that occurred in one Worker (user registration, order completion, etc.) to trigger subsequent processing.
  • Rate limit mitigation: Put requests to external APIs into a queue first, then process them sequentially on the consumer side in a controlled manner.
# Example: Send a welcome email asynchronously after user registration
post "/api/users" do |req, res|
  # User registration processing (completes immediately)
  # ...

  # Delegate email sending to the queue (asynchronous)
  Uzumibi::Queue.send("UZUMIBI_QUEUE", "welcome_email:#{user_email}")

  res.status_code = 201
  res.body = JSON.generate({status: "registered"})
  res
end

Consumer

A Consumer is responsible for receiving and processing messages from the queue.

Note: In a standard Cloudflare project, the web application that sends to a Cloudflare Queue and the process that receives messages from the queue can be defined in the same Worker file. However, in Uzumibi 0.6.x, you need to generate a separate consumer-only application, so please keep this in mind.

In the Ruby code of the consumer application, you prepare a class that inherits from Uzumibi::Consumer and override the on_receive method to implement the processing logic.

Consumers have the following characteristics:

  • Batch processing: Messages are received in batches (up to 10 messages, with a maximum wait time of 5 seconds).
  • Retry functionality: Failed messages can be retried after a specified delay.
  • Acknowledgment: Successfully processed messages are acknowledged with ack! and removed from the queue.
class Consumer < Uzumibi::Consumer
  def on_receive(message)
    debug_console("Processing: #{message.body}")

    # Acknowledge after successful processing
    message.ack!
  end
end

The Message object has the following attributes:

AttributeTypeDescription
message.idStringUnique ID of the message
message.timestampStringTimestamp when the message was sent (ISO 8601 format)
message.bodyStringMessage body
message.attemptsIntegerNumber of delivery attempts so far

The Message object also has the following methods:

MethodDescription
message.ack!Notify that message processing is complete and remove it from the queue
message.retry(delay_seconds: N)Redeliver the message after N seconds