BlogQueuesRabbitMQ with MQTT for IoT: Building a Scalable Device Messaging Platform

RabbitMQ with MQTT for IoT: Building a Scalable Device Messaging Platform

Adrian Silaghi
Adrian Silaghi
April 6, 2026
12 min read
1 views
#rabbitmq #mqtt #iot #messaging #devices #telemetry #europe

The Internet of Things is no longer a buzzword. It is a reality powering smart factories, connected vehicles, precision agriculture, and building automation systems worldwide. At the heart of every IoT deployment lies a critical challenge: how do you reliably collect data from thousands (or millions) of constrained devices, route it to backend services, and act on it in real time?

The answer for many production IoT systems is MQTT on the device side and AMQP on the backend side, with RabbitMQ acting as the polyglot broker bridging both worlds. This guide walks you through building a scalable device messaging platform using RabbitMQ's native MQTT plugin.

Why MQTT Matters for IoT

MQTT (Message Queuing Telemetry Transport) was designed in 1999 for oil pipeline monitoring over satellite links. That heritage makes it uniquely suited for IoT:

  • Lightweight binary protocol: A minimal MQTT CONNECT packet is just 14 bytes. Compare that to HTTP headers alone, which typically exceed 200 bytes. For battery-powered sensors transmitting every 30 seconds, this difference is the difference between months and weeks of battery life.
  • Low bandwidth: Fixed 2-byte header with no verbose text framing. Ideal for cellular (NB-IoT, LTE-M) and LoRaWAN connections where every byte costs money.
  • Persistent sessions: Devices can disconnect (e.g., cellular network outage) and reconnect later. The broker queues messages during the offline period and delivers them when the device returns. No data loss.
  • Last Will and Testament (LWT): The broker automatically publishes a pre-configured message if a device disconnects ungracefully, enabling instant offline detection.
  • Three QoS levels: Fine-grained delivery guarantees per message.

MQTT Quality of Service Levels

QoS Level Name Guarantee Use Case
0 At most once Fire and forget. No acknowledgement. High-frequency sensor readings where occasional loss is acceptable (temperature every 5s)
1 At least once Acknowledged delivery. May produce duplicates. Telemetry that must arrive but duplicates are harmless (GPS positions, energy meter readings)
2 Exactly once Four-step handshake ensures single delivery. Critical commands and billing events (valve open/close, payment transactions)

For most IoT telemetry workloads, QoS 1 is the sweet spot. QoS 0 is too lossy for data you care about, and QoS 2 doubles the round-trip overhead (four packets instead of two).

RabbitMQ as a Polyglot Broker

Here is what makes RabbitMQ special for IoT: it speaks multiple protocols simultaneously. While dedicated MQTT brokers like Mosquitto or EMQX only speak MQTT, RabbitMQ natively supports AMQP 0-9-1, MQTT 3.1.1/5.0, and STOMP on the same broker instance.

This means your IoT devices can publish telemetry over MQTT while your backend microservices consume that same data over AMQP with full routing, dead-letter queues, and acknowledgement semantics. No bridge, no proxy, no translation layer. One broker, two protocol worlds.

Architecture Overview

A production IoT messaging platform built on RabbitMQ typically follows this flow:

┌─────────────────────────────────────────────────────────────┐
│                      IoT Device Layer                       │
│                                                             │
│  [Sensor A] [Sensor B] [Gateway C] [Actuator D] [Camera E] │
│      │          │           │            │           │      │
│      └──────────┴─────┬─────┴────────────┴───────────┘      │
│                       │ MQTT (TLS, port 8883)               │
└───────────────────────┼─────────────────────────────────────┘
                        ▼
┌───────────────────────────────────────────────────────────────┐
│                    RabbitMQ Broker                             │
│                                                               │
│  ┌─────────────┐    ┌──────────────┐    ┌──────────────────┐  │
│  │ MQTT Plugin │───▶│ amq.topic    │───▶│ Telemetry Queue  │  │
│  │ (devices)   │    │ (exchange)   │    │ Analytics Queue  │  │
│  └─────────────┘    │              │    │ Alerts Queue     │  │
│                     └──────────────┘    └──────────────────┘  │
│                                               │               │
└───────────────────────────────────────────────┼───────────────┘
                                                │ AMQP
                                                ▼
┌───────────────────────────────────────────────────────────────┐
│                   Backend Services Layer                       │
│                                                               │
│  [Telemetry Ingester] [Analytics Engine] [Alert Processor]    │
│         │                    │                  │              │
│         ▼                    ▼                  ▼              │
│  [TimescaleDB]        [ClickHouse]       [PagerDuty/Email]    │
└───────────────────────────────────────────────────────────────┘

Devices connect via MQTT over TLS. RabbitMQ's MQTT plugin translates the messages into its internal AMQP model. Backend services consume from AMQP queues with proper routing, retries, and dead-letter handling. Each service gets exactly the messages it cares about.

MQTT Topic Mapping in RabbitMQ

Understanding how RabbitMQ maps MQTT topics to its AMQP exchange/routing-key model is essential for designing your topic hierarchy.

The Translation Rules

When an MQTT client publishes to a topic like devices/sensor-42/telemetry/temperature, RabbitMQ does the following:

  • The message is published to the amq.topic exchange (the default MQTT exchange)
  • The MQTT topic separator / is translated to . (AMQP routing key separator)
  • The routing key becomes devices.sensor-42.telemetry.temperature
  • MQTT wildcards + and # map to AMQP wildcards * and #
MQTT Topic / Filter AMQP Routing Key / Binding Description
devices/sensor-42/temp devices.sensor-42.temp Exact topic publish
devices/+/temp devices.*.temp Single-level wildcard: any device's temperature
devices/# devices.# Multi-level wildcard: all device messages
devices/+/alerts/# devices.*.alerts.# Combined: alerts from any device, any subtopic

Recommended Topic Hierarchy for IoT

# Pattern: {region}/{site}/{device-type}/{device-id}/{data-type}
eu-west/factory-berlin/plc/plc-001/telemetry
eu-west/factory-berlin/plc/plc-001/status
eu-west/factory-berlin/sensor/temp-014/reading
eu-west/warehouse-munich/camera/cam-03/event

# Commands (device subscribes):
eu-west/factory-berlin/plc/plc-001/command

This hierarchical structure lets AMQP consumers bind with wildcards. A telemetry ingester binds to eu-west.#.telemetry to receive all telemetry. An alert service binds to #.event to catch every event. A site dashboard binds to eu-west.factory-berlin.# to see everything from one factory.

Code Examples: Device to Backend

Python MQTT Publisher (Device Side)

This example simulates an IoT sensor publishing temperature and humidity telemetry using the paho-mqtt library:

import json
import time
import ssl
import paho.mqtt.client as mqtt

# Connection settings
BROKER_HOST = "your-queue.danubedata.ro"
BROKER_PORT = 8883  # MQTT over TLS
DEVICE_ID = "sensor-042"
USERNAME = "device-sensor-042"
PASSWORD = "device-secret-token"

# Topic hierarchy
TELEMETRY_TOPIC = f"eu-west/factory-berlin/sensor/{DEVICE_ID}/telemetry"
STATUS_TOPIC = f"eu-west/factory-berlin/sensor/{DEVICE_ID}/status"


def on_connect(client, userdata, flags, rc, properties=None):
    if rc == 0:
        print(f"[{DEVICE_ID}] Connected to broker")
        # Subscribe to commands for this device
        cmd_topic = f"eu-west/factory-berlin/sensor/{DEVICE_ID}/command"
        client.subscribe(cmd_topic, qos=1)
    else:
        print(f"[{DEVICE_ID}] Connection failed: {rc}")


def on_message(client, userdata, msg):
    """Handle incoming commands from backend."""
    command = json.loads(msg.payload)
    print(f"[{DEVICE_ID}] Received command: {command}")
    if command.get("action") == "set_interval":
        print(f"  Updating read interval to {command['value']}s")


def create_client():
    client = mqtt.Client(
        client_id=DEVICE_ID,
        protocol=mqtt.MQTTv5,
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
    )

    # Authentication
    client.username_pw_set(USERNAME, PASSWORD)

    # TLS encryption (mandatory for production)
    client.tls_set(
        ca_certs="/etc/ssl/certs/ca-certificates.crt",
        tls_version=ssl.PROTOCOL_TLSv1_2,
    )

    # Last Will: notify if device disconnects unexpectedly
    will_payload = json.dumps({
        "device_id": DEVICE_ID,
        "status": "offline",
        "timestamp": int(time.time()),
    })
    client.will_set(STATUS_TOPIC, will_payload, qos=1, retain=True)

    client.on_connect = on_connect
    client.on_message = on_message

    return client


def read_sensor():
    """Simulate sensor reading."""
    import random
    return {
        "temperature_c": round(20 + random.uniform(-5, 15), 2),
        "humidity_pct": round(40 + random.uniform(0, 40), 1),
        "battery_v": round(3.3 - random.uniform(0, 0.5), 2),
    }


def main():
    client = create_client()
    client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
    client.loop_start()

    try:
        while True:
            reading = read_sensor()
            payload = json.dumps({
                "device_id": DEVICE_ID,
                "timestamp": int(time.time()),
                "readings": reading,
            })

            # QoS 1: at-least-once delivery
            result = client.publish(
                TELEMETRY_TOPIC,
                payload,
                qos=1,
                retain=False,
            )
            result.wait_for_publish()
            print(f"[{DEVICE_ID}] Published: {reading}")

            time.sleep(30)  # Transmit every 30 seconds
    except KeyboardInterrupt:
        # Graceful disconnect: publish online status then disconnect
        client.publish(STATUS_TOPIC, json.dumps({
            "device_id": DEVICE_ID,
            "status": "offline",
            "timestamp": int(time.time()),
        }), qos=1, retain=True)
        client.disconnect()


if __name__ == "__main__":
    main()

Node.js AMQP Consumer (Backend Side)

This backend service consumes telemetry from the same RabbitMQ broker over AMQP, processes the data, and stores it:

const amqplib = require("amqplib");

const AMQP_URL = "amqps://backend-svc:secret@your-queue.danubedata.ro:5671";
const QUEUE_NAME = "telemetry-ingester";
const BINDING_PATTERN = "eu-west.#.telemetry"; // All EU telemetry

async function startConsumer() {
  const connection = await amqplib.connect(AMQP_URL);
  const channel = await connection.createChannel();

  // Prefetch: process 50 messages at a time
  await channel.prefetch(50);

  // Declare a durable queue for this service
  await channel.assertQueue(QUEUE_NAME, {
    durable: true,
    arguments: {
      "x-dead-letter-exchange": "dlx.telemetry",
      "x-message-ttl": 86400000, // 24h TTL
    },
  });

  // Bind to amq.topic (where MQTT messages land)
  await channel.bindQueue(QUEUE_NAME, "amq.topic", BINDING_PATTERN);

  console.log(`[Ingester] Waiting for telemetry on: ${BINDING_PATTERN}`);

  channel.consume(QUEUE_NAME, async (msg) => {
    if (!msg) return;

    try {
      const payload = JSON.parse(msg.content.toString());
      const routingKey = msg.fields.routingKey;

      // Parse routing key to extract metadata
      // eu-west.factory-berlin.sensor.sensor-042.telemetry
      const parts = routingKey.split(".");
      const region = parts[0];
      const site = parts[1];
      const deviceType = parts[2];
      const deviceId = parts[3];

      console.log(
        `[Ingester] ${deviceId} @ ${site}: ` +
        `temp=${payload.readings.temperature_c}C, ` +
        `humidity=${payload.readings.humidity_pct}%`
      );

      // Store in time-series database
      await storeTelemetry({
        device_id: deviceId,
        region,
        site,
        device_type: deviceType,
        timestamp: new Date(payload.timestamp * 1000),
        temperature: payload.readings.temperature_c,
        humidity: payload.readings.humidity_pct,
        battery: payload.readings.battery_v,
      });

      // Check alert thresholds
      if (payload.readings.temperature_c > 30) {
        await channel.publish(
          "amq.topic",
          `${region}.${site}.${deviceType}.${deviceId}.alert`,
          Buffer.from(JSON.stringify({
            device_id: deviceId,
            alert: "high_temperature",
            value: payload.readings.temperature_c,
            threshold: 30,
            timestamp: payload.timestamp,
          }))
        );
      }

      channel.ack(msg);
    } catch (err) {
      console.error("[Ingester] Processing error:", err.message);
      // Reject and send to dead-letter queue
      channel.nack(msg, false, false);
    }
  });

  // Graceful shutdown
  process.on("SIGTERM", async () => {
    console.log("[Ingester] Shutting down...");
    await channel.close();
    await connection.close();
    process.exit(0);
  });
}

startConsumer().catch(console.error);

Notice how the Python device publishes over MQTT to eu-west/factory-berlin/sensor/sensor-042/telemetry and the Node.js service consumes over AMQP with routing key eu-west.#.telemetry. RabbitMQ handles the protocol translation transparently.

Handling 10,000+ Concurrent Device Connections

Scaling from a proof-of-concept with 10 devices to a production deployment with 10K+ concurrent connections requires deliberate configuration tuning.

Connection Limits and OS Tuning

# /etc/rabbitmq/rabbitmq.conf

# Increase max connections (default is ~65K but OS limits apply)
mqtt.tcp_listen_options.backlog = 4096
mqtt.tcp_listen_options.nodelay = true
mqtt.tcp_listen_options.sndbuf = 32768
mqtt.tcp_listen_options.recbuf = 32768

# File descriptor limit (each connection = 1 FD)
# Set in systemd or Docker: LimitNOFILE=300000

# Memory high watermark (block publishers at 60% RAM)
vm_memory_high_watermark.relative = 0.6

MQTT-Specific Tuning

Setting Recommended Value Why
mqtt.max_session_expiry_interval_seconds 86400 (24h) Limit how long persistent sessions survive. Prevents abandoned sessions from consuming memory indefinitely.
heartbeat 60 Detect dead connections within 2x heartbeat (120s). Critical for cellular devices that silently disappear.
consumer_timeout 1800000 (30 min) Cancel consumers that do not acknowledge messages. Prevents stuck consumers from blocking queues.
mqtt.prefetch 10 Limit in-flight messages per MQTT client. Low value prevents slow devices from being overwhelmed.
channel_max 8 Limit channels per connection. MQTT uses 1 channel, so keep this low.

Prefetch Tuning for Backend AMQP Consumers

While MQTT device prefetch should be low (devices are slow), your backend AMQP consumers should use higher prefetch values:

// Telemetry ingester: high throughput, fast processing
await channel.prefetch(100);

// Alert processor: lower throughput, may call external APIs
await channel.prefetch(10);

// Command dispatcher: low throughput, must be reliable
await channel.prefetch(1);

The rule of thumb: set prefetch equal to the number of messages your consumer can process per network round-trip time. If processing takes 5ms and RTT is 10ms, a prefetch of 2-5 keeps the pipeline full without overloading the consumer.

Security for IoT Deployments

IoT security is non-negotiable. Compromised devices can be weaponized for botnets, and intercepted telemetry can leak sensitive operational data.

Transport Encryption

  • MQTT over TLS (port 8883): All device connections must use TLS 1.2 or higher. Never expose plain MQTT (port 1883) to the internet.
  • AMQPS (port 5671): Backend services connect over AMQP with TLS. Internal traffic within a private network can use plain AMQP (port 5672) if encrypted at the network layer.
  • Certificate pinning: For high-security deployments, pin the broker's TLS certificate in device firmware to prevent man-in-the-middle attacks even if a CA is compromised.

Per-Device Credentials

Never share credentials across devices. If one device is compromised, you need to revoke its access without affecting the fleet:

# RabbitMQ: Create a user per device
rabbitmqctl add_user "device-sensor-042" "unique-random-password"
rabbitmqctl set_permissions -p "/iot" "device-sensor-042" 
    "" 
    "amq.topic" 
    "mqtt-subscription-sensor-042qos1"

Topic-Level ACLs

Restrict each device to only its own topics. A compromised temperature sensor should not be able to publish to an actuator's command topic:

# rabbitmq.conf - Topic authorization
mqtt.default_user = none
mqtt.allow_anonymous = false

# In the topic_permissions, restrict device-sensor-042 to:
# - Publish only to:  eu-west/factory-berlin/sensor/sensor-042/*
# - Subscribe only to: eu-west/factory-berlin/sensor/sensor-042/command

Best practice: Use RabbitMQ's topic authorization plugin with regex patterns. Each device gets write access only to its own subtree and read access only to its command topic. Backend services get broad read access across all device topics.

Real-World IoT Use Cases

Fleet Telemetry

A logistics company tracks 5,000 delivery vehicles. Each vehicle publishes GPS coordinates, speed, fuel level, and engine diagnostics every 10 seconds over MQTT QoS 1. Backend AMQP consumers feed the data into a real-time map, route optimization engine, and predictive maintenance system. RabbitMQ's topic routing ensures the map service only receives GPS data while the maintenance service only receives engine diagnostics.

Smart Building Sensors

A commercial building with 2,000 sensors (temperature, humidity, CO2, occupancy, energy meters) publishes readings every 30 seconds. HVAC controllers subscribe to aggregated zone data to optimize energy consumption. The facility management dashboard subscribes to all data for visualization. Persistent MQTT sessions ensure no readings are lost during brief network disruptions from Wi-Fi roaming.

Industrial Monitoring (IIoT)

A manufacturing plant monitors 500 PLCs and CNC machines. QoS 2 is used for critical machine commands (start/stop/emergency). QoS 1 handles vibration sensor data feeding a predictive maintenance ML model. QoS 0 handles high-frequency (100Hz) accelerometer data where occasional loss is acceptable. RabbitMQ's dead-letter exchanges capture failed command deliveries for manual review.

Precision Agriculture

A farm deploys 800 soil moisture sensors, 50 weather stations, and 20 irrigation controllers across 2,000 hectares. Solar-powered sensors transmit over LoRaWAN to a gateway, which bridges to MQTT. The backend analyzes soil moisture trends, cross-references weather forecasts, and sends irrigation commands back to controllers. Battery voltage telemetry triggers maintenance alerts before sensors go offline.

EU Data Sovereignty for IoT

If your IoT devices operate in Europe or collect data from European users, data sovereignty is not optional. It is law.

EU Cyber Resilience Act (CRA)

The EU Cyber Resilience Act, which entered into force in 2024 with enforcement beginning in 2027, mandates cybersecurity requirements for all products with digital elements sold in the EU. For IoT manufacturers and platform operators, this means:

  • Security by design: Devices must ship with secure defaults (no default passwords, encrypted communication)
  • Vulnerability management: Manufacturers must provide security updates for the expected product lifetime
  • Incident reporting: Actively exploited vulnerabilities must be reported to ENISA within 24 hours
  • Software Bill of Materials (SBOM): Document all software components, including your message broker

GDPR for Connected Devices

When IoT devices collect data that can be linked to individuals (location data, energy usage patterns, building occupancy), GDPR applies. Key requirements:

  • Data minimization: Only collect telemetry you actually need. Do not transmit PII in topic names.
  • Data residency: Personal data must be processed within the EU (or in countries with adequacy decisions). Your message broker must be hosted in an EU data center.
  • Right to erasure: You must be able to purge all messages and stored data for a specific device/user upon request.
  • Encryption in transit and at rest: TLS for MQTT/AMQP connections, and encrypted persistent queues for at-rest protection.

Key insight: Hosting your message broker in a US-owned hyperscaler's EU region does not guarantee compliance. Under the US CLOUD Act, US companies can be compelled to hand over data regardless of where it is stored. Choosing a European-owned infrastructure provider eliminates this legal risk entirely.

DanubeData Managed Queues: MQTT-Ready RabbitMQ in Germany

Building and operating a production MQTT/AMQP broker is complex. You need to manage TLS certificates, monitor connection counts, tune memory watermarks, configure persistence, set up clustering for high availability, and keep up with security patches. For IoT deployments where uptime directly impacts physical operations, this operational burden is significant.

DanubeData Managed Queues give you a fully configured RabbitMQ instance with MQTT support, hosted on dedicated hardware in Falkenstein, Germany:

  • MQTT 3.1.1 and 5.0 support enabled out of the box with TLS termination
  • AMQP 0-9-1 and STOMP available simultaneously on the same broker
  • Persistent NVMe storage for durable queues and persistent sessions
  • Automated backups with point-in-time recovery
  • Private networking between your queues and other DanubeData services (VPS, databases)
  • No data leaves the EU: Hetzner-owned German data centers, no US CLOUD Act exposure
  • Starting at just EUR 9.99/month with plans scaling to handle 10K+ concurrent MQTT connections

Launch a managed RabbitMQ instance with MQTT support

Getting Started in 5 Minutes

  1. Create a Queue instance at danubedata.ro/queues/create and select RabbitMQ
  2. Enable MQTT in the protocol settings (TLS endpoint auto-configured)
  3. Create device credentials via the management UI or API
  4. Connect your devices using the provided MQTT endpoint and TLS certificate
  5. Bind AMQP consumers to amq.topic with your desired routing patterns

Your devices publish over MQTT. Your backend consumes over AMQP. DanubeData handles the broker operations, security patches, monitoring, and backups.

Check our transparent pricing with no hidden egress fees and no per-message charges.

Conclusion

RabbitMQ with MQTT is one of the most practical architectures for IoT messaging platforms. MQTT gives your constrained devices a lightweight, reliable protocol with persistent sessions and QoS guarantees. RabbitMQ gives your backend the full power of AMQP routing, dead-letter handling, and consumer management. Together, they bridge the gap between the device world and the service world without a separate translation layer.

For production IoT deployments in Europe, data sovereignty compliance is non-negotiable. Hosting your message broker on European-owned infrastructure in EU data centers eliminates CLOUD Act risk and simplifies GDPR compliance.

Whether you are monitoring a fleet of vehicles, automating a factory floor, or building the next generation of smart agriculture, the MQTT-to-AMQP bridge through RabbitMQ gives you the flexibility to evolve your architecture as your device fleet grows.

Questions about building an IoT messaging platform? Our team has hands-on experience with large-scale MQTT deployments. Reach out at contact@danubedata.ro or visit danubedata.ro/queues/create to get started.

Share this article

Ready to Get Started?

Deploy your infrastructure in minutes with DanubeData's managed services.