MQTT Plugin¶
Description¶
The MQTT plugin provides real-time messaging integration using the MQTT protocol through the Output plugin system. It registers the mqtt://
URI scheme and serves as an output handler for streaming analytics data to MQTT brokers. The plugin supports both standalone usage and integration with the Output plugin for coordinated data export.
Key Features¶
- Output Handler Integration: Registered
mqtt://
URI scheme with Output plugin - MQTT 3.1.1 and 5.0 Protocol Support: Full protocol compatibility
- TLS/SSL Encrypted Connections: Secure data transmission
- Quality of Service (QoS) Levels: 0, 1, 2 for delivery guarantees
- JSON Message Formatting: Structured data export
- Persistent Sessions: Connection state management
- Authentication Support: Username/password, certificates
- Event-driven Architecture: Real-time analytics streaming
- Connection Management: Automatic reconnection and error handling
When to Use¶
- Real-time analytics data streaming to MQTT brokers
- IoT device integration and communication
- Microservices event-driven architectures
- Cloud platform integration (AWS IoT, Azure IoT Hub, Google Cloud IoT)
- Message queue integration for external systems
- Event notification systems
- Distributed analytics pipeline integration
Requirements¶
Software Dependencies¶
- MQTT broker (Mosquitto, HiveMQ, AWS IoT, etc.)
- Network connectivity to MQTT broker
- Optional: TLS certificates for secure connections
Network Requirements¶
- TCP/IP connectivity to MQTT broker
- Default MQTT port: 1883 (unencrypted) or 8883 (TLS)
- Firewall rules allowing outbound connections
- Low-latency network for real-time streaming
Configuration¶
The MQTT plugin is configured through the Output plugin system using the mqtt://
URI scheme.
Basic Configuration¶
{
"output": {
"handlers": {
"mqtt-events": {
"uri": "mqtt://broker.example.com:1883",
"sink": "events",
"enabled": true,
"config": {
"clientId": "cvedia-rt-001",
"topic": "analytics/detections",
"qos": 1
}
}
}
}
}
Advanced Configuration¶
{
"output": {
"handlers": {
"secure-mqtt": {
"uri": "mqtt://secure-broker.example.com:8883",
"sink": "events",
"enabled": true,
"config": {
"clientId": "cvedia-rt-prod-001",
"topic": "analytics/events",
"qos": 1,
"keepAlive": 60,
"cleanSession": true,
"tls": true,
"authentication": {
"username": "cvedia-client",
"password": "${MQTT_PASSWORD}",
"certPath": "/certs/client-cert.pem",
"keyPath": "/certs/client-key.pem",
"caPath": "/certs/ca-cert.pem"
},
"reconnect": {
"enabled": true,
"interval": 5000,
"maxRetries": 10
}
}
}
}
}
}
Multiple Topic Configuration¶
{
"output": {
"handlers": {
"mqtt-detections": {
"uri": "mqtt://broker.example.com:1883",
"sink": "events",
"enabled": true,
"config": {
"clientId": "cvedia-detections",
"topic": "analytics/detections",
"qos": 1
}
},
"mqtt-diagnostics": {
"uri": "mqtt://broker.example.com:1883",
"sink": "diagnostics",
"enabled": true,
"config": {
"clientId": "cvedia-diagnostics",
"topic": "system/diagnostics",
"qos": 0
}
}
}
}
}
Configuration Schema¶
Parameter | Type | Default | Description |
---|---|---|---|
uri |
string | required | MQTT broker URI (mqtt://host:port) |
sink |
string | "events" | Data sink to connect to |
enabled |
boolean | true | Enable/disable MQTT output |
config.clientId |
string | auto-generated | Unique MQTT client identifier |
config.topic |
string | "analytics/data" | MQTT topic for publishing |
config.qos |
integer | 0 | Quality of service level (0, 1, 2) |
config.keepAlive |
integer | 60 | Keep-alive interval in seconds |
config.cleanSession |
boolean | true | Start with clean session |
config.tls |
boolean | false | Enable TLS/SSL encryption |
config.authentication.username |
string | null | Authentication username |
config.authentication.password |
string | null | Authentication password |
config.authentication.certPath |
string | null | Path to client certificate |
config.authentication.keyPath |
string | null | Path to client private key |
config.authentication.caPath |
string | null | Path to CA certificate |
config.reconnect.enabled |
boolean | true | Enable automatic reconnection |
config.reconnect.interval |
integer | 5000 | Reconnection interval in milliseconds |
config.reconnect.maxRetries |
integer | -1 | Maximum reconnection attempts (-1 = infinite) |
URI Scheme¶
The MQTT plugin registers the mqtt://
URI scheme with the Output plugin system.
URI Format¶
mqtt://[username:password@]host[:port][?options]
Components: - host: MQTT broker hostname or IP address - port: MQTT broker port (default: 1883 for plain, 8883 for TLS) - username: Optional authentication username - password: Optional authentication password - options: URL-encoded query parameters
URI Examples¶
mqtt://localhost:1883 # Local broker, default port
mqtt://broker.example.com:1883 # Remote broker
mqtt://user:[email protected]:8883 # With authentication
mqtt://iot-broker.cloud.com:8883?tls=true # With TLS
API Reference¶
C++ API¶
The MQTT plugin implements both the standalone iface::MQTT
interface and the iface::OutputHandler
interface:
// Output Handler Interface (registered with Output plugin)
class MQTTOutputHandler : public iface::OutputHandler {
public:
// Constructor for output handler
MQTTOutputHandler(const std::string& moduleName,
const std::string& schema,
const std::string& sink,
const std::string& uri,
pCValue config);
// Factory method (registered with Output plugin)
static std::shared_ptr<iface::OutputHandler> create(
const std::string& moduleName,
const std::string& schema,
const std::string& sink,
const std::string& path,
pCValue config);
// Output handler methods
expected<bool> write(pCValue sinkData = VAL(),
std::string dynamicPath = "") override;
void stop() override;
void close() override;
std::string getSink() override;
};
// Standalone MQTT Interface
class MQTTManaged : public iface::MQTT {
public:
// Connection methods
bool connect();
bool connect(const std::string& mqttServer,
const std::string& clientId,
pCValue credentials = nullptr);
bool disconnect();
bool isConnected() const;
// Subscription methods (for standalone use)
bool subscribe(const std::string& topic, int qos = 0);
bool unsubscribe(const std::string& topic);
// Message methods
std::string readMessage();
pCValue readJsonMessage(bool blockingCall = false);
void writeMessage(const std::string& topic, pCValue msg);
// Configuration methods
bool setConfig(pCValue config);
pCValue getConfig() const;
};
Lua API¶
The MQTT plugin is accessible through both the Output plugin system and standalone factory:
-- Via Output plugin system (recommended for data export)
local output = api.factory.output.create(instance, "MQTTExporter")
-- Add MQTT output handler
local mqttHandler = output:addHandler(
"mqtt-events",
"mqtt://broker.example.com:1883",
"events",
{
clientId = "cvedia-analytics",
topic = "analytics/detections",
qos = 1
}
)
-- Standalone MQTT client (for direct control)
local mqtt = api.factory.mqtt.create("MQTTClient")
-- Connection methods
mqtt:connect() -- Uses configuration
mqtt:connect(server, clientId) -- Basic connection
mqtt:connect(server, clientId, certPath) -- With certificate
mqtt:connect(server, clientId, username, password) -- With credentials
mqtt:connect(server, clientId, certPath, username, password) -- Full auth
-- Subscribe to topics (QoS always 0 in Lua)
mqtt:subscribe("topic/path")
-- Read messages
local stringMsg = mqtt:readMessage() -- Returns string
local tableMsg = mqtt:readJsonMessage() -- Returns Lua table
-- Write messages (strings only)
mqtt:writeMessage("topic/path", "message string")
-- Configuration
local config = mqtt:getConfig()
mqtt:saveConfig(newConfig)
Note: The Lua API has some limitations compared to C++:
- subscribe()
doesn't expose the QoS parameter
- readJsonMessage()
doesn't expose the blockingCall parameter
- writeMessage()
only accepts strings, not tables
Examples¶
Basic Event Streaming¶
-- Create output instance for MQTT streaming
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "MQTTEventStreamer")
-- Configure MQTT event export
local mqttConfig = {
clientId = "cvedia-analytics-001",
topic = "analytics/events",
qos = 1,
keepAlive = 60,
cleanSession = true
}
local mqttHandler = output:addHandler(
"event-stream",
"mqtt://broker.example.com:1883",
"events",
mqttConfig
)
if mqttHandler then
api.logging.LogInfo("MQTT event streaming configured")
api.logging.LogInfo("Broker: broker.example.com:1883")
api.logging.LogInfo("Topic: analytics/events")
api.logging.LogInfo("QoS: 1")
-- Events from the "events" sink will be automatically streamed to MQTT
-- The Output plugin handles JSON serialization and message formatting
else
api.logging.LogError("Failed to create MQTT event handler")
end
Secure MQTT with TLS¶
local output = api.factory.output.create(instance, "SecureMQTTExporter")
-- Configure secure MQTT connection
local secureConfig = {
clientId = "cvedia-secure-001",
topic = "secure/analytics/events",
qos = 2, -- Highest QoS for critical data
tls = true,
authentication = {
username = "cvedia-client",
password = os.getenv("MQTT_PASSWORD"),
certPath = "/certs/client.pem",
keyPath = "/certs/client-key.pem",
caPath = "/certs/ca.pem"
},
reconnect = {
enabled = true,
interval = 5000,
maxRetries = 10
}
}
local secureHandler = output:addHandler(
"secure-stream",
"mqtt://secure-broker.example.com:8883",
"events",
secureConfig
)
if secureHandler then
api.logging.LogInfo("Secure MQTT streaming configured")
api.logging.LogInfo("TLS encryption enabled")
api.logging.LogInfo("Certificate-based authentication active")
else
api.logging.LogError("Failed to create secure MQTT handler")
end
Multi-Topic Data Distribution¶
local output = api.factory.output.create(instance, "MQTTDistributor")
-- Configure multiple MQTT outputs for different data types
local mqttConfigs = {
{
name = "detections",
uri = "mqtt://analytics.example.com:1883",
sink = "events",
config = {
clientId = "cvedia-detections",
topic = "analytics/detections",
qos = 1
}
},
{
name = "diagnostics",
uri = "mqtt://monitoring.example.com:1883",
sink = "diagnostics",
config = {
clientId = "cvedia-diagnostics",
topic = "system/diagnostics",
qos = 0 -- Fire-and-forget for diagnostics
}
},
{
name = "alerts",
uri = "mqtt://alerts.example.com:1883",
sink = "alerts",
config = {
clientId = "cvedia-alerts",
topic = "security/alerts",
qos = 2 -- Guaranteed delivery for alerts
}
}
}
-- Create all MQTT handlers
local activeHandlers = {}
for _, config in ipairs(mqttConfigs) do
local handler = output:addHandler(config.name, config.uri, config.sink, config.config)
if handler then
table.insert(activeHandlers, config.name)
api.logging.LogInfo("Created MQTT handler: " .. config.name)
api.logging.LogInfo(" Topic: " .. config.config.topic)
api.logging.LogInfo(" QoS: " .. config.config.qos)
else
api.logging.LogError("Failed to create handler: " .. config.name)
end
end
api.logging.LogInfo("Active MQTT handlers: " .. #activeHandlers)
Cloud Platform Integration¶
-- AWS IoT Core integration
local output = api.factory.output.create(instance, "AWSIoTExporter")
local awsConfig = {
clientId = "cvedia-thing-001",
topic = "$aws/things/cvedia-thing-001/shadow/update",
qos = 1,
tls = true,
authentication = {
certPath = "/certs/aws-thing-cert.pem",
keyPath = "/certs/aws-thing-private.key",
caPath = "/certs/aws-root-ca.pem"
}
}
local awsHandler = output:addHandler(
"aws-iot",
"mqtt://xxxxx.iot.us-east-1.amazonaws.com:8883",
"events",
awsConfig
)
-- Azure IoT Hub integration
local azureConfig = {
clientId = "cvedia-device-001",
topic = "devices/cvedia-device-001/messages/events/",
qos = 1,
tls = true,
authentication = {
username = "cvedia-device-001",
password = "SharedAccessSignature sr=..." -- SAS token
}
}
local azureHandler = output:addHandler(
"azure-iot",
"mqtt://myiothub.azure-devices.net:8883",
"events",
azureConfig
)
if awsHandler and azureHandler then
api.logging.LogInfo("Cloud platform integration configured")
api.logging.LogInfo("AWS IoT Core: Active")
api.logging.LogInfo("Azure IoT Hub: Active")
end
Dynamic Topic Publishing¶
local output = api.factory.output.create(instance, "DynamicMQTT")
-- Use dynamic topic generation based on event data
local dynamicConfig = {
clientId = "cvedia-dynamic",
topic = "analytics/{{camera_id}}/{{event_type}}", -- Dynamic topic template
qos = 1,
topicTemplate = true -- Enable topic templating
}
local dynamicHandler = output:addHandler(
"dynamic-mqtt",
"mqtt://broker.example.com:1883",
"events",
dynamicConfig
)
if dynamicHandler then
api.logging.LogInfo("Dynamic MQTT topics configured")
api.logging.LogInfo("Topic template: analytics/{{camera_id}}/{{event_type}}")
api.logging.LogInfo("Topics will be generated based on event data")
-- Example resulting topics:
-- analytics/camera-001/intrusion
-- analytics/camera-002/line-crossing
-- analytics/camera-003/object-detection
end
Standalone MQTT Usage¶
-- For direct MQTT control (not through Output plugin)
local mqtt = api.factory.mqtt.create("DirectMQTT")
-- Connect to broker
local success = mqtt:connect("mqtt://localhost:1883", "cvedia-direct")
if not success then
api.logging.LogError("Failed to connect to MQTT broker")
return
end
-- Subscribe to control commands
mqtt:subscribe("control/cvedia/commands")
-- JSON helper
local json = require("json")
-- Process commands
function processControlCommands()
local message = mqtt:readJsonMessage()
if message then
if message.command == "start_analytics" then
api.logging.LogInfo("Starting analytics...")
elseif message.command == "update_config" then
api.logging.LogInfo("Updating configuration...")
end
end
end
-- Publish status updates
function publishStatus(status)
mqtt:writeMessage("status/cvedia/system", json.encode(status))
end
-- Main processing loop
function onTick()
processControlCommands()
-- Periodic status update
if os.time() % 30 == 0 then -- Every 30 seconds
publishStatus({
timestamp = os.time(),
status = "running",
fps = api.analytics.getCurrentFps()
})
end
end
Best Practices¶
Output Handler Usage¶
-
Use Output Plugin Integration:
- Prefer Output plugin handlers over standalone MQTT clients
- Leverage automatic JSON serialization and error handling
- Benefit from coordinated data export management
-
Topic Design:
- Use hierarchical topics:
analytics/camera-01/detections
- Avoid leading slashes:
analytics/data
not/analytics/data
- Include metadata in topic structure
- Implement topic namespacing for multi-tenant systems
- Use hierarchical topics:
-
QoS Selection:
- QoS 0: Real-time data that can tolerate loss
- QoS 1: Important events that need delivery confirmation
- QoS 2: Critical alerts requiring exactly-once delivery
Connection Management¶
-
Client ID Strategy:
- Use unique client IDs to avoid conflicts
- Include instance/camera identifiers
- Consider using timestamps or UUIDs for uniqueness
-
Reconnection Logic:
- Enable automatic reconnection for production
- Use exponential backoff for retry intervals
- Set reasonable maximum retry limits
- Monitor connection status and log events
-
Resource Management:
- Clean up connections properly on shutdown
- Use clean sessions unless persistence is required
- Monitor memory usage for high-throughput scenarios
Security¶
-
Always Use TLS in Production:
- Use port 8883 for TLS connections
- Validate broker certificates
- Keep certificates updated and rotated
-
Authentication:
- Use certificate-based authentication when possible
- Rotate credentials regularly
- Store passwords securely (environment variables)
- Implement proper access controls
-
Data Protection:
- Encrypt sensitive data before transmission
- Validate message integrity
- Implement message-level encryption for high-security scenarios
Performance¶
-
Message Optimization:
- Keep messages small (< 256KB recommended)
- Use efficient JSON serialization
- Batch small messages when possible
- Implement message compression for large payloads
-
Network Efficiency:
- Monitor network latency and bandwidth
- Use local brokers for edge deployments
- Implement message throttling for high-frequency data
- Consider message queuing for burst scenarios
-
Broker Considerations:
- Monitor broker performance and resource usage
- Implement proper broker clustering for high availability
- Use appropriate broker persistence settings
- Set up monitoring and alerting for broker health
Troubleshooting¶
Connection Issues¶
-
"Connection refused"
- Verify broker URL and port:
telnet broker.example.com 1883
- Check network connectivity and firewall rules
- Validate credentials and authentication
- Check broker logs for connection attempts
- Verify client ID uniqueness
- Verify broker URL and port:
-
"SSL/TLS handshake failed"
- Verify certificate paths and file permissions
- Check certificate validity:
openssl x509 -in cert.pem -text
- Ensure proper CA certificate chain
- Verify TLS version compatibility with broker
- Check for certificate expiration
-
"Client ID already in use"
- Ensure unique client IDs across all connections
- Check for duplicate CVEDIA-RT instances
- Use timestamps or UUIDs in client ID generation
- Implement client ID conflict resolution
Message Delivery Issues¶
-
Messages not being published
- Check Output plugin handler registration
- Verify sink data is available and correct format
- Monitor MQTT connection status
- Check broker topic permissions and ACLs
- Validate message size limits
-
Messages not reaching subscribers
- Verify topic names match exactly (case-sensitive)
- Check broker retained message settings
- Monitor broker queue limits and memory usage
- Verify QoS settings between publisher and subscriber
- Check subscriber connection status
-
High message latency
- Use QoS 0 for real-time applications
- Check network latency to broker
- Monitor broker CPU and memory usage
- Consider using local/edge brokers
- Implement message timestamping for latency monitoring
Output Handler Issues¶
-
Handler creation fails
- Check MQTT broker connectivity before handler creation
- Verify URI format and parameters
- Validate configuration parameters
- Check file permissions for certificate files
- Monitor system resources
-
Data not flowing through handler
- Verify sink name matches data source
- Check if handler is enabled in configuration
- Monitor Output plugin event processing
- Validate data format compatibility
- Check for JSON serialization errors
-
Performance degradation
- Monitor message queue sizes
- Check network bandwidth utilization
- Implement message batching if appropriate
- Monitor broker response times
- Consider scaling broker infrastructure
Debugging Tools¶
-
MQTT Client Tools:
# Test connection with mosquitto clients mosquitto_pub -h broker.example.com -t test/topic -m "test message" mosquitto_sub -h broker.example.com -t "analytics/#" -v # Test TLS connection mosquitto_pub -h secure-broker.com -p 8883 --cafile ca.pem \ --cert client.pem --key client-key.pem \ -t test/topic -m "secure test"
-
Network Debugging:
# Test connectivity telnet broker.example.com 1883 # Check TLS handshake openssl s_client -connect broker.example.com:8883 -servername broker.example.com # Monitor network traffic tcpdump -i any -n port 1883
-
Log Analysis:
- Enable MQTT plugin debug logging
- Monitor Output plugin handler status
- Check broker connection logs
- Analyze message delivery statistics
Integration Examples¶
AWS IoT Core¶
{
"output": {
"handlers": {
"aws-iot": {
"uri": "mqtt://xxxxx.iot.us-east-1.amazonaws.com:8883",
"sink": "events",
"enabled": true,
"config": {
"clientId": "cvedia-thing-001",
"topic": "$aws/things/cvedia-thing-001/shadow/update",
"qos": 1,
"tls": true,
"authentication": {
"certPath": "/certs/aws-device-cert.pem",
"keyPath": "/certs/aws-device-private.key",
"caPath": "/certs/aws-root-ca.pem"
}
}
}
}
}
}
Azure IoT Hub¶
{
"output": {
"handlers": {
"azure-iot": {
"uri": "mqtt://myiothub.azure-devices.net:8883",
"sink": "events",
"enabled": true,
"config": {
"clientId": "cvedia-device-001",
"topic": "devices/cvedia-device-001/messages/events/",
"qos": 1,
"tls": true,
"authentication": {
"username": "cvedia-device-001",
"password": "SharedAccessSignature sr=myiothub.azure-devices.net%2Fdevices%2Fcvedia-device-001&sig=..."
}
}
}
}
}
}
Google Cloud IoT Core¶
{
"output": {
"handlers": {
"gcp-iot": {
"uri": "mqtt://mqtt.googleapis.com:8883",
"sink": "events",
"enabled": true,
"config": {
"clientId": "projects/my-project/locations/us-central1/registries/my-registry/devices/cvedia-device",
"topic": "/devices/cvedia-device/events",
"qos": 1,
"tls": true,
"authentication": {
"username": "unused",
"password": "jwt_token_here"
}
}
}
}
}
}
Local Broker Setup¶
# Install and configure Mosquitto
sudo apt-get install mosquitto mosquitto-clients
# Configure broker (/etc/mosquitto/mosquitto.conf)
port 1883
allow_anonymous true
log_dest file /var/log/mosquitto/mosquitto.log
# Start broker
sudo systemctl start mosquitto
sudo systemctl enable mosquitto
# Test configuration
mosquitto_sub -h localhost -t "analytics/#" &
mosquitto_pub -h localhost -t "analytics/test" -m "Hello CVEDIA-RT"
See Also¶
- Output Plugins Overview
- Output Plugin - Main output coordination system
- WriteData Plugin - File-based output
- REST Plugin - HTTP API integration
- GStreamerWriter Plugin - Video streaming
- HLS Plugin - HTTP Live Streaming
- Dynamic Strings Documentation - Topic templating
- Lua Scripting Reference - Full API documentation