Consumer Implementation

The consumer Worker receives messages from a Cloudflare Queue and processes them. In Uzumibi, you inherit from the Uzumibi::Consumer class and implement processing in the on_receive method.

Project Structure

When the queue feature is enabled, the following file is generated instead of the standard lib/app.rb:

  • lib/consumer.rb - Consumer Ruby code

Additionally, consumer-specific Wasm export functions (uzumibi_initialize_message, uzumibi_start_message) are added to wasm-app/src/lib.rs.

queue-consumer/
├── lib/
│   └── consumer.rb     # Queue consumer processing
├── src/
│   └── index.js        # JS glue code (handles both HTTP and Queue)
├── wasm-app/
│   ├── build.rs        # Compiles both app.rb and consumer.rb
│   └── src/
│       └── lib.rs      # Wasm (exports both HTTP and Queue processing)
├── wrangler.jsonc
└── package.json

The consumer Worker is dedicated to processing Queue messages.

Verifying wrangler.jsonc

The consumer's wrangler.jsonc contains queues.consumers configuration. Double-check that the queue name matches the sender:

{
    "queues": {
        "producers": [
            {
                "binding": "UZUMIBI_QUEUE",
                "queue": "my-app-queue"
            }
        ],
        "consumers": [
            {
                "queue": "my-app-queue",
                "max_batch_size": 10,
                "max_batch_timeout": 5
            }
        ]
    }
}

Implementing the Consumer

Edit lib/consumer.rb to implement the message processing logic:

class Consumer < Uzumibi::Consumer
  # @rbs message: Uzumibi::Message
  def on_receive(message)
    debug_console("[Consumer] Received message: id=#{message.id}, body=#{message.body}, attempts=#{message.attempts}")

    # Process the message
    body = message.body
    debug_console("[Consumer] Processing: #{body}")

    # Acknowledge after successful processing
    if message.attempts > 3
      # Give up and ack after more than 3 retries
      debug_console("[Consumer] Giving up after #{message.attempts} attempts, acknowledging message #{message.id}")
      message.ack!
    else
      # Normal processing
      begin
        process_message(body)
        debug_console("[Consumer] Successfully processed message #{message.id}")
        message.ack!
      rescue => e
        debug_console("[Consumer] Error processing message: retrying in 5 seconds")
        message.retry(delay_seconds: 5)
      end
    end
  end

  def process_message(body)
    # Actual message processing logic
    debug_console("[Consumer] Message content: #{body}")
    # Write specific processing here
    # e.g., calling external APIs, saving data, etc.
  end
end

$CONSUMER = Consumer.new

Code Walkthrough

Inheriting Uzumibi::Consumer

class Consumer < Uzumibi::Consumer
  def on_receive(message)
    # ...
  end
end

Inherit from Uzumibi::Consumer and override the on_receive method. This method is called each time a message is received from the queue.

Message Attributes

The message object (Uzumibi::Message) passed to on_receive provides the following information:

message.id        # Message ID (string)
message.body      # Message body (string)
message.attempts  # Delivery attempt count (integer)
message.timestamp # Send timestamp (ISO 8601 format string)

Acknowledgment (ack!)

message.ack!

Call ack! on a message once processing is complete. Messages that have been acknowledged are removed from the queue.

Retry

message.retry(delay_seconds: 5)

If processing fails, call retry to redeliver the message. Specify the delay time until redelivery in seconds with delay_seconds. The next time on_receive is called, message.attempts will be incremented.

Global Variable $CONSUMER

$CONSUMER = Consumer.new

Assign the consumer instance to the global variable $CONSUMER. This is referenced from the Rust code on the Wasm side, so it must always be set.

Deploying for Verification

Note: With the current Wrangler, Queue communication across multiple projects cannot be verified in the localhost development environment.

Deploy queue-consumer for verification.

First, perform the build:

pnpm install
pnpm run dev
# The HTTP part is empty, so it starts but behavior can't be verified.
# This is just to build the Wasm, so stop it afterwards.

Verifying Operation

Testing

To check the received logs, display logs in the terminal with the following command:

Terminal (queue-consumer):

$ cd queue-consumer
$ npx wrangler tail

 ⛅️ wrangler 4.73.0
───────────────────
Successfully created tail, expires at 2026-03-14T19:19:39Z

In a separate terminal, issue the following curl command to send a message:

curl -X POST -d "Test message from publisher" \
    http://queue-publisher.<ID>.workers.dev/api/send
{"message":"Test message from publisher","status":"accepted"}

Message processing logs will appear in the queue-consumer console:

Queue my-app-queue (1 message) - Ok @ 2026/3/14 22:22:05
  (log) [debug]: [Consumer] Received message: id=25f4ef4ca8b2fdb7055cc5b9XXXXXXXX, body=Test message from publisher, attempts=1
  (log) [debug]: [Consumer] Processing: Test message from publisher
  (log) [debug]: [Consumer] Message content: Test message from publisher
  (log) [debug]: [Consumer] Successfully processed message 25f4ef4ca8b2fdb7055cc5b9XXXXXXXX

Processing status can also be checked from the Cloudflare dashboard under "Workers & Pages" > target Worker > "Logs".

Chapter Summary

Queues enable easy asynchronous processing with Cloudflare Workers + Uzumibi.

When using Queues, the architecture is somewhat special. Please note that with Uzumibi, two Workers are required.