Skip to content

MQTT Plugin

Description

The MQTT plugin provides real-time messaging integration using the MQTT protocol through the Output plugin system. It registers the mqtt:// URI scheme and serves as an output handler for streaming analytics data to MQTT brokers. The plugin supports both standalone usage and integration with the Output plugin for coordinated data export.

Key Features

  • Output Handler Integration: Registered mqtt:// URI scheme with Output plugin
  • MQTT 3.1.1 and 5.0 Protocol Support: Full protocol compatibility
  • TLS/SSL Encrypted Connections: Secure data transmission
  • Quality of Service (QoS) Levels: 0, 1, 2 for delivery guarantees
  • JSON Message Formatting: Structured data export
  • Persistent Sessions: Connection state management
  • Authentication Support: Username/password, certificates
  • Event-driven Architecture: Real-time analytics streaming
  • Connection Management: Automatic reconnection and error handling

When to Use

  • Real-time analytics data streaming to MQTT brokers
  • IoT device integration and communication
  • Microservices event-driven architectures
  • Cloud platform integration (AWS IoT, Azure IoT Hub, Google Cloud IoT)
  • Message queue integration for external systems
  • Event notification systems
  • Distributed analytics pipeline integration

Requirements

Software Dependencies

  • MQTT broker (Mosquitto, HiveMQ, AWS IoT, etc.)
  • Network connectivity to MQTT broker
  • Optional: TLS certificates for secure connections

Network Requirements

  • TCP/IP connectivity to MQTT broker
  • Default MQTT port: 1883 (unencrypted) or 8883 (TLS)
  • Firewall rules allowing outbound connections
  • Low-latency network for real-time streaming

Configuration

The MQTT plugin is configured through the Output plugin system using the mqtt:// URI scheme.

Basic Configuration

{
  "output": {
    "handlers": {
      "mqtt-events": {
        "uri": "mqtt://broker.example.com:1883",
        "sink": "events",
        "enabled": true,
        "config": {
          "clientId": "cvedia-rt-001",
          "topic": "analytics/detections",
          "qos": 1
        }
      }
    }
  }
}

Advanced Configuration

{
  "output": {
    "handlers": {
      "secure-mqtt": {
        "uri": "mqtt://secure-broker.example.com:8883",
        "sink": "events",
        "enabled": true,
        "config": {
          "clientId": "cvedia-rt-prod-001",
          "topic": "analytics/events",
          "qos": 1,
          "keepAlive": 60,
          "cleanSession": true,
          "tls": true,
          "authentication": {
            "username": "cvedia-client",
            "password": "${MQTT_PASSWORD}",
            "certPath": "/certs/client-cert.pem",
            "keyPath": "/certs/client-key.pem",
            "caPath": "/certs/ca-cert.pem"
          },
          "reconnect": {
            "enabled": true,
            "interval": 5000,
            "maxRetries": 10
          }
        }
      }
    }
  }
}

Multiple Topic Configuration

{
  "output": {
    "handlers": {
      "mqtt-detections": {
        "uri": "mqtt://broker.example.com:1883",
        "sink": "events",
        "enabled": true,
        "config": {
          "clientId": "cvedia-detections",
          "topic": "analytics/detections",
          "qos": 1
        }
      },
      "mqtt-diagnostics": {
        "uri": "mqtt://broker.example.com:1883", 
        "sink": "diagnostics",
        "enabled": true,
        "config": {
          "clientId": "cvedia-diagnostics",
          "topic": "system/diagnostics",
          "qos": 0
        }
      }
    }
  }
}

Configuration Schema

Parameter Type Default Description
uri string required MQTT broker URI (mqtt://host:port)
sink string "events" Data sink to connect to
enabled boolean true Enable/disable MQTT output
config.clientId string auto-generated Unique MQTT client identifier
config.topic string "analytics/data" MQTT topic for publishing
config.qos integer 0 Quality of service level (0, 1, 2)
config.keepAlive integer 60 Keep-alive interval in seconds
config.cleanSession boolean true Start with clean session
config.tls boolean false Enable TLS/SSL encryption
config.authentication.username string null Authentication username
config.authentication.password string null Authentication password
config.authentication.certPath string null Path to client certificate
config.authentication.keyPath string null Path to client private key
config.authentication.caPath string null Path to CA certificate
config.reconnect.enabled boolean true Enable automatic reconnection
config.reconnect.interval integer 5000 Reconnection interval in milliseconds
config.reconnect.maxRetries integer -1 Maximum reconnection attempts (-1 = infinite)

URI Scheme

The MQTT plugin registers the mqtt:// URI scheme with the Output plugin system.

URI Format

mqtt://[username:password@]host[:port][?options]

Components: - host: MQTT broker hostname or IP address - port: MQTT broker port (default: 1883 for plain, 8883 for TLS) - username: Optional authentication username - password: Optional authentication password - options: URL-encoded query parameters

URI Examples

mqtt://localhost:1883                           # Local broker, default port
mqtt://broker.example.com:1883                  # Remote broker
mqtt://user:[email protected]:8883        # With authentication
mqtt://iot-broker.cloud.com:8883?tls=true       # With TLS

API Reference

C++ API

The MQTT plugin implements both the standalone iface::MQTT interface and the iface::OutputHandler interface:

// Output Handler Interface (registered with Output plugin)
class MQTTOutputHandler : public iface::OutputHandler {
public:
    // Constructor for output handler
    MQTTOutputHandler(const std::string& moduleName,
                     const std::string& schema, 
                     const std::string& sink,
                     const std::string& uri, 
                     pCValue config);

    // Factory method (registered with Output plugin)
    static std::shared_ptr<iface::OutputHandler> create(
        const std::string& moduleName,
        const std::string& schema,
        const std::string& sink, 
        const std::string& path,
        pCValue config);

    // Output handler methods
    expected<bool> write(pCValue sinkData = VAL(), 
                        std::string dynamicPath = "") override;
    void stop() override;
    void close() override;
    std::string getSink() override;
};

// Standalone MQTT Interface
class MQTTManaged : public iface::MQTT {
public:
    // Connection methods
    bool connect();
    bool connect(const std::string& mqttServer, 
                const std::string& clientId, 
                pCValue credentials = nullptr);
    bool disconnect();
    bool isConnected() const;

    // Subscription methods (for standalone use)
    bool subscribe(const std::string& topic, int qos = 0);
    bool unsubscribe(const std::string& topic);

    // Message methods
    std::string readMessage();
    pCValue readJsonMessage(bool blockingCall = false);
    void writeMessage(const std::string& topic, pCValue msg);

    // Configuration methods
    bool setConfig(pCValue config);
    pCValue getConfig() const;
};

Lua API

The MQTT plugin is accessible through both the Output plugin system and standalone factory:

-- Via Output plugin system (recommended for data export)
local output = api.factory.output.create(instance, "MQTTExporter")

-- Add MQTT output handler
local mqttHandler = output:addHandler(
    "mqtt-events",
    "mqtt://broker.example.com:1883",
    "events",
    {
        clientId = "cvedia-analytics",
        topic = "analytics/detections",
        qos = 1
    }
)

-- Standalone MQTT client (for direct control)
local mqtt = api.factory.mqtt.create("MQTTClient")

-- Connection methods
mqtt:connect()  -- Uses configuration
mqtt:connect(server, clientId)  -- Basic connection
mqtt:connect(server, clientId, certPath)  -- With certificate
mqtt:connect(server, clientId, username, password)  -- With credentials
mqtt:connect(server, clientId, certPath, username, password)  -- Full auth

-- Subscribe to topics (QoS always 0 in Lua)
mqtt:subscribe("topic/path")

-- Read messages
local stringMsg = mqtt:readMessage()  -- Returns string
local tableMsg = mqtt:readJsonMessage()  -- Returns Lua table

-- Write messages (strings only)
mqtt:writeMessage("topic/path", "message string")

-- Configuration
local config = mqtt:getConfig()
mqtt:saveConfig(newConfig)

Note: The Lua API has some limitations compared to C++: - subscribe() doesn't expose the QoS parameter - readJsonMessage() doesn't expose the blockingCall parameter - writeMessage() only accepts strings, not tables

Full Lua API Reference →

Examples

Basic Event Streaming

-- Create output instance for MQTT streaming
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "MQTTEventStreamer")

-- Configure MQTT event export
local mqttConfig = {
    clientId = "cvedia-analytics-001",
    topic = "analytics/events",
    qos = 1,
    keepAlive = 60,
    cleanSession = true
}

local mqttHandler = output:addHandler(
    "event-stream",
    "mqtt://broker.example.com:1883",
    "events",
    mqttConfig
)

if mqttHandler then
    api.logging.LogInfo("MQTT event streaming configured")
    api.logging.LogInfo("Broker: broker.example.com:1883")
    api.logging.LogInfo("Topic: analytics/events")
    api.logging.LogInfo("QoS: 1")

    -- Events from the "events" sink will be automatically streamed to MQTT
    -- The Output plugin handles JSON serialization and message formatting
else
    api.logging.LogError("Failed to create MQTT event handler")
end

Secure MQTT with TLS

local output = api.factory.output.create(instance, "SecureMQTTExporter")

-- Configure secure MQTT connection
local secureConfig = {
    clientId = "cvedia-secure-001",
    topic = "secure/analytics/events",
    qos = 2,  -- Highest QoS for critical data
    tls = true,
    authentication = {
        username = "cvedia-client",
        password = os.getenv("MQTT_PASSWORD"),
        certPath = "/certs/client.pem",
        keyPath = "/certs/client-key.pem", 
        caPath = "/certs/ca.pem"
    },
    reconnect = {
        enabled = true,
        interval = 5000,
        maxRetries = 10
    }
}

local secureHandler = output:addHandler(
    "secure-stream",
    "mqtt://secure-broker.example.com:8883",
    "events",
    secureConfig
)

if secureHandler then
    api.logging.LogInfo("Secure MQTT streaming configured")
    api.logging.LogInfo("TLS encryption enabled")
    api.logging.LogInfo("Certificate-based authentication active")
else
    api.logging.LogError("Failed to create secure MQTT handler")
end

Multi-Topic Data Distribution

local output = api.factory.output.create(instance, "MQTTDistributor")

-- Configure multiple MQTT outputs for different data types
local mqttConfigs = {
    {
        name = "detections",
        uri = "mqtt://analytics.example.com:1883",
        sink = "events",
        config = {
            clientId = "cvedia-detections",
            topic = "analytics/detections",  
            qos = 1
        }
    },
    {
        name = "diagnostics", 
        uri = "mqtt://monitoring.example.com:1883",
        sink = "diagnostics",
        config = {
            clientId = "cvedia-diagnostics",
            topic = "system/diagnostics",
            qos = 0  -- Fire-and-forget for diagnostics
        }
    },
    {
        name = "alerts",
        uri = "mqtt://alerts.example.com:1883", 
        sink = "alerts",
        config = {
            clientId = "cvedia-alerts",
            topic = "security/alerts",
            qos = 2  -- Guaranteed delivery for alerts
        }
    }
}

-- Create all MQTT handlers
local activeHandlers = {}
for _, config in ipairs(mqttConfigs) do
    local handler = output:addHandler(config.name, config.uri, config.sink, config.config)
    if handler then
        table.insert(activeHandlers, config.name)
        api.logging.LogInfo("Created MQTT handler: " .. config.name)
        api.logging.LogInfo("  Topic: " .. config.config.topic)
        api.logging.LogInfo("  QoS: " .. config.config.qos)
    else
        api.logging.LogError("Failed to create handler: " .. config.name)
    end
end

api.logging.LogInfo("Active MQTT handlers: " .. #activeHandlers)

Cloud Platform Integration

-- AWS IoT Core integration
local output = api.factory.output.create(instance, "AWSIoTExporter")

local awsConfig = {
    clientId = "cvedia-thing-001", 
    topic = "$aws/things/cvedia-thing-001/shadow/update",
    qos = 1,
    tls = true,
    authentication = {
        certPath = "/certs/aws-thing-cert.pem",
        keyPath = "/certs/aws-thing-private.key",
        caPath = "/certs/aws-root-ca.pem"
    }
}

local awsHandler = output:addHandler(
    "aws-iot",
    "mqtt://xxxxx.iot.us-east-1.amazonaws.com:8883",
    "events",
    awsConfig
)

-- Azure IoT Hub integration
local azureConfig = {
    clientId = "cvedia-device-001",
    topic = "devices/cvedia-device-001/messages/events/",
    qos = 1,
    tls = true,
    authentication = {
        username = "cvedia-device-001",
        password = "SharedAccessSignature sr=..."  -- SAS token
    }
}

local azureHandler = output:addHandler(
    "azure-iot",
    "mqtt://myiothub.azure-devices.net:8883",
    "events", 
    azureConfig
)

if awsHandler and azureHandler then
    api.logging.LogInfo("Cloud platform integration configured")
    api.logging.LogInfo("AWS IoT Core: Active")
    api.logging.LogInfo("Azure IoT Hub: Active")
end

Dynamic Topic Publishing

local output = api.factory.output.create(instance, "DynamicMQTT")

-- Use dynamic topic generation based on event data
local dynamicConfig = {
    clientId = "cvedia-dynamic",
    topic = "analytics/{{camera_id}}/{{event_type}}", -- Dynamic topic template
    qos = 1,
    topicTemplate = true  -- Enable topic templating
}

local dynamicHandler = output:addHandler(
    "dynamic-mqtt",
    "mqtt://broker.example.com:1883",
    "events",
    dynamicConfig
)

if dynamicHandler then
    api.logging.LogInfo("Dynamic MQTT topics configured")
    api.logging.LogInfo("Topic template: analytics/{{camera_id}}/{{event_type}}")
    api.logging.LogInfo("Topics will be generated based on event data")

    -- Example resulting topics:
    -- analytics/camera-001/intrusion
    -- analytics/camera-002/line-crossing
    -- analytics/camera-003/object-detection
end

Standalone MQTT Usage

-- For direct MQTT control (not through Output plugin)
local mqtt = api.factory.mqtt.create("DirectMQTT")

-- Connect to broker
local success = mqtt:connect("mqtt://localhost:1883", "cvedia-direct")
if not success then
    api.logging.LogError("Failed to connect to MQTT broker")
    return
end

-- Subscribe to control commands
mqtt:subscribe("control/cvedia/commands")

-- JSON helper
local json = require("json")

-- Process commands
function processControlCommands()
    local message = mqtt:readJsonMessage()
    if message then
        if message.command == "start_analytics" then
            api.logging.LogInfo("Starting analytics...")
        elseif message.command == "update_config" then
            api.logging.LogInfo("Updating configuration...")
        end
    end
end

-- Publish status updates
function publishStatus(status)
    mqtt:writeMessage("status/cvedia/system", json.encode(status))
end

-- Main processing loop
function onTick()
    processControlCommands()

    -- Periodic status update
    if os.time() % 30 == 0 then  -- Every 30 seconds
        publishStatus({
            timestamp = os.time(),
            status = "running",
            fps = api.analytics.getCurrentFps()
        })
    end
end

Best Practices

Output Handler Usage

  1. Use Output Plugin Integration:

    • Prefer Output plugin handlers over standalone MQTT clients
    • Leverage automatic JSON serialization and error handling
    • Benefit from coordinated data export management
  2. Topic Design:

    • Use hierarchical topics: analytics/camera-01/detections
    • Avoid leading slashes: analytics/data not /analytics/data
    • Include metadata in topic structure
    • Implement topic namespacing for multi-tenant systems
  3. QoS Selection:

    • QoS 0: Real-time data that can tolerate loss
    • QoS 1: Important events that need delivery confirmation
    • QoS 2: Critical alerts requiring exactly-once delivery

Connection Management

  1. Client ID Strategy:

    • Use unique client IDs to avoid conflicts
    • Include instance/camera identifiers
    • Consider using timestamps or UUIDs for uniqueness
  2. Reconnection Logic:

    • Enable automatic reconnection for production
    • Use exponential backoff for retry intervals
    • Set reasonable maximum retry limits
    • Monitor connection status and log events
  3. Resource Management:

    • Clean up connections properly on shutdown
    • Use clean sessions unless persistence is required
    • Monitor memory usage for high-throughput scenarios

Security

  1. Always Use TLS in Production:

    • Use port 8883 for TLS connections
    • Validate broker certificates
    • Keep certificates updated and rotated
  2. Authentication:

    • Use certificate-based authentication when possible
    • Rotate credentials regularly
    • Store passwords securely (environment variables)
    • Implement proper access controls
  3. Data Protection:

    • Encrypt sensitive data before transmission
    • Validate message integrity
    • Implement message-level encryption for high-security scenarios

Performance

  1. Message Optimization:

    • Keep messages small (< 256KB recommended)
    • Use efficient JSON serialization
    • Batch small messages when possible
    • Implement message compression for large payloads
  2. Network Efficiency:

    • Monitor network latency and bandwidth
    • Use local brokers for edge deployments
    • Implement message throttling for high-frequency data
    • Consider message queuing for burst scenarios
  3. Broker Considerations:

    • Monitor broker performance and resource usage
    • Implement proper broker clustering for high availability
    • Use appropriate broker persistence settings
    • Set up monitoring and alerting for broker health

Troubleshooting

Connection Issues

  1. "Connection refused"

    • Verify broker URL and port: telnet broker.example.com 1883
    • Check network connectivity and firewall rules
    • Validate credentials and authentication
    • Check broker logs for connection attempts
    • Verify client ID uniqueness
  2. "SSL/TLS handshake failed"

    • Verify certificate paths and file permissions
    • Check certificate validity: openssl x509 -in cert.pem -text
    • Ensure proper CA certificate chain
    • Verify TLS version compatibility with broker
    • Check for certificate expiration
  3. "Client ID already in use"

    • Ensure unique client IDs across all connections
    • Check for duplicate CVEDIA-RT instances
    • Use timestamps or UUIDs in client ID generation
    • Implement client ID conflict resolution

Message Delivery Issues

  1. Messages not being published

    • Check Output plugin handler registration
    • Verify sink data is available and correct format
    • Monitor MQTT connection status
    • Check broker topic permissions and ACLs
    • Validate message size limits
  2. Messages not reaching subscribers

    • Verify topic names match exactly (case-sensitive)
    • Check broker retained message settings
    • Monitor broker queue limits and memory usage
    • Verify QoS settings between publisher and subscriber
    • Check subscriber connection status
  3. High message latency

    • Use QoS 0 for real-time applications
    • Check network latency to broker
    • Monitor broker CPU and memory usage
    • Consider using local/edge brokers
    • Implement message timestamping for latency monitoring

Output Handler Issues

  1. Handler creation fails

    • Check MQTT broker connectivity before handler creation
    • Verify URI format and parameters
    • Validate configuration parameters
    • Check file permissions for certificate files
    • Monitor system resources
  2. Data not flowing through handler

    • Verify sink name matches data source
    • Check if handler is enabled in configuration
    • Monitor Output plugin event processing
    • Validate data format compatibility
    • Check for JSON serialization errors
  3. Performance degradation

    • Monitor message queue sizes
    • Check network bandwidth utilization
    • Implement message batching if appropriate
    • Monitor broker response times
    • Consider scaling broker infrastructure

Debugging Tools

  1. MQTT Client Tools:

    # Test connection with mosquitto clients
    mosquitto_pub -h broker.example.com -t test/topic -m "test message"
    mosquitto_sub -h broker.example.com -t "analytics/#" -v
    
    # Test TLS connection
    mosquitto_pub -h secure-broker.com -p 8883 --cafile ca.pem \
                  --cert client.pem --key client-key.pem \
                  -t test/topic -m "secure test"
    

  2. Network Debugging:

    # Test connectivity
    telnet broker.example.com 1883
    
    # Check TLS handshake
    openssl s_client -connect broker.example.com:8883 -servername broker.example.com
    
    # Monitor network traffic
    tcpdump -i any -n port 1883
    

  3. Log Analysis:

    • Enable MQTT plugin debug logging
    • Monitor Output plugin handler status
    • Check broker connection logs
    • Analyze message delivery statistics

Integration Examples

AWS IoT Core

{
  "output": {
    "handlers": {
      "aws-iot": {
        "uri": "mqtt://xxxxx.iot.us-east-1.amazonaws.com:8883",
        "sink": "events",
        "enabled": true,
        "config": {
          "clientId": "cvedia-thing-001",
          "topic": "$aws/things/cvedia-thing-001/shadow/update",
          "qos": 1,
          "tls": true,
          "authentication": {
            "certPath": "/certs/aws-device-cert.pem",
            "keyPath": "/certs/aws-device-private.key", 
            "caPath": "/certs/aws-root-ca.pem"
          }
        }
      }
    }
  }
}

Azure IoT Hub

{
  "output": {
    "handlers": {
      "azure-iot": {
        "uri": "mqtt://myiothub.azure-devices.net:8883",
        "sink": "events",
        "enabled": true,
        "config": {
          "clientId": "cvedia-device-001",
          "topic": "devices/cvedia-device-001/messages/events/",
          "qos": 1,
          "tls": true,
          "authentication": {
            "username": "cvedia-device-001",
            "password": "SharedAccessSignature sr=myiothub.azure-devices.net%2Fdevices%2Fcvedia-device-001&sig=..."
          }
        }
      }
    }
  }
}

Google Cloud IoT Core

{
  "output": {
    "handlers": {
      "gcp-iot": {
        "uri": "mqtt://mqtt.googleapis.com:8883", 
        "sink": "events",
        "enabled": true,
        "config": {
          "clientId": "projects/my-project/locations/us-central1/registries/my-registry/devices/cvedia-device",
          "topic": "/devices/cvedia-device/events",
          "qos": 1,
          "tls": true,
          "authentication": {
            "username": "unused",
            "password": "jwt_token_here"
          }
        }
      }
    }
  }
}

Local Broker Setup

# Install and configure Mosquitto
sudo apt-get install mosquitto mosquitto-clients

# Configure broker (/etc/mosquitto/mosquitto.conf)
port 1883
allow_anonymous true
log_dest file /var/log/mosquitto/mosquitto.log

# Start broker
sudo systemctl start mosquitto
sudo systemctl enable mosquitto

# Test configuration
mosquitto_sub -h localhost -t "analytics/#" &
mosquitto_pub -h localhost -t "analytics/test" -m "Hello CVEDIA-RT"

See Also