Output Plugin¶
Description¶
The Output plugin serves as CVEDIA-RT's central coordinator for data export and output management. Rather than implementing all output functionality itself, it delegates to specialized output plugins while providing a unified interface for managing multiple output handlers. This architecture allows flexible routing of analytics results, events, and metadata to various destinations through purpose-built plugins.
The Output plugin acts as an orchestration layer that:
- Manages multiple output handlers simultaneously
- Routes data to appropriate specialized plugins
- Provides unified configuration and lifecycle management
- Handles event schema versioning and validation
- Coordinates between different output destinations
Key Features¶
- Plugin Coordination: Delegates to specialized plugins for actual output operations
- Handler Management: Create and manage multiple output handlers from different plugins
- Event Schema Management: Version-controlled event formats across all output types
- Unified Configuration: Single configuration point for all output destinations
- Dynamic Routing: Route different event types to appropriate output plugins
- Lifecycle Management: Coordinate initialization, operation, and cleanup of output handlers
- Plugin Integration: Seamlessly works with GStreamerWriter, HLS, WriteData, MQTT, REST, and other output plugins
When to Use¶
- Managing multiple output destinations simultaneously
- Routing different event types to different output plugins
- Centralizing output configuration and management
- Implementing complex output workflows with multiple handlers
- Coordinating between file, streaming, and messaging outputs
- Building flexible data export pipelines
Available Output Plugins¶
The Output plugin coordinates with these specialized plugins:
- GStreamerWriter: Video stream output and encoding
- HLS: HTTP Live Streaming for web-based video delivery
- WriteData: File-based data export (JSON, CSV, XML)
- MQTT: Message queue telemetry transport for IoT integration
- REST: RESTful API integration for webhooks and HTTP endpoints
- NBus: Network bus communication for distributed systems
- TopBus: High-performance message bus for real-time data
Plugin Selection Guide¶
Use Case | Recommended Plugin | URI Scheme |
---|---|---|
RTSP video streaming | GStreamerWriter | rtsp:// |
Web video streaming | HLS | hls:// |
File data export | WriteData | json:// , csv:// , xml:// |
IoT messaging | MQTT | mqtt:// , mqtts:// |
Webhook integration | REST | http:// , https:// |
Network bus messaging | NBus | nbus:// |
High-performance messaging | TopBus | topbus:// |
ZeroMQ messaging | ZMQ | zmq:// |
Requirements¶
Software Dependencies¶
- File system write permissions (for file outputs)
- Network connectivity (for remote outputs)
- Optional: Database drivers for database outputs
- Optional: HTTP libraries for REST API outputs
Storage Requirements¶
- Varies by data volume and retention settings
- Recommend SSD storage for high-frequency outputs
- Monitor disk space for file-based outputs
Configuration¶
Basic Configuration¶
The Output plugin configuration defines handlers that utilize specialized output plugins:
{
"output": {
"handlers": {
"file-export": {
"uri": "writedata:///data/exports/",
"plugin": "WriteData",
"format": "json",
"enabled": true
},
"mqtt-events": {
"uri": "mqtt://localhost:1883",
"plugin": "MQTT",
"topic": "analytics/events",
"enabled": true
}
},
"exporters": {
"event-intrusion": {
"enabled": true,
"version": 1
}
}
}
}
Advanced Configuration¶
{
"output": {
"handlers": {
"primary-export": {
"uri": "writedata:///data/analytics/",
"plugin": "WriteData",
"sink": "output",
"format": "json",
"enabled": true,
"config": {
"rotation": {
"enabled": true,
"maxFileSize": "100MB",
"maxFiles": 10
}
}
},
"mqtt-stream": {
"uri": "mqtt://broker.example.com:1883",
"plugin": "MQTT",
"sink": "events",
"enabled": true,
"config": {
"topics": {
"events": "analytics/events",
"detections": "analytics/detections"
},
"qos": 1
}
},
"rest-api": {
"uri": "https://api.monitoring.com/webhooks",
"plugin": "REST",
"sink": "alerts",
"enabled": true,
"config": {
"authentication": {
"type": "bearer",
"token": "${API_TOKEN}"
},
"retry": {
"enabled": true,
"maxRetries": 3,
"backoffMs": 1000
}
}
},
"video-stream": {
"uri": "rtmp://streaming.example.com/live",
"plugin": "GStreamerWriter",
"sink": "video",
"enabled": true,
"config": {
"codec": "h264",
"bitrate": 2000000
}
},
"hls-output": {
"uri": "hls:///var/www/html/live/",
"plugin": "HLS",
"sink": "video",
"enabled": true,
"config": {
"segmentDuration": 2,
"playlistLength": 10
}
}
},
"exporters": {
"event-intrusion": {
"enabled": true,
"version": 1,
"includeMetadata": true,
"includeCrops": false
},
"event-area-enter": {
"enabled": true,
"version": 1,
"includeMetadata": true
},
"raw-detections": {
"enabled": false,
"version": 1,
"samplingRate": 0.1
},
"diagnostics": {
"enabled": true,
"version": 1,
"level": "error"
}
},
"global": {
"enableTimestamps": true,
"timestampFormat": "iso8601",
"bufferSize": 1000,
"flushInterval": 5000
}
}
}
Configuration Schema¶
Parameter | Type | Default | Description |
---|---|---|---|
handlers |
object | {} | Output handler configurations |
handlers.*.uri |
string | required | Output destination URI (plugin-specific) |
handlers.*.plugin |
string | auto-detected | Specialized plugin to use |
handlers.*.sink |
string | "output" | Data sink name to connect to |
handlers.*.enabled |
boolean | true | Enable/disable handler |
handlers.*.config |
object | {} | Plugin-specific configuration |
exporters |
object | {} | Event exporter configurations |
exporters.*.enabled |
boolean | true | Enable/disable exporter |
exporters.*.version |
integer | 1 | Schema version to use |
global.enableTimestamps |
boolean | true | Include timestamps in output |
global.timestampFormat |
string | "iso8601" | Timestamp format |
global.bufferSize |
integer | 1000 | Output buffer size |
global.flushInterval |
integer | 5000 | Buffer flush interval (ms) |
API Reference¶
C++ API¶
The Output plugin implements the iface::Output
interface:
class OutputImpl : public iface::Output {
public:
// Handler management
expected<std::shared_ptr<iface::OutputHandler>> addHandler(
const std::string& name,
const std::string& outputUri,
const std::string& sink = "output",
pCValue config = nullptr);
expected<void> removeHandler(const std::string& name);
expected<bool> hasHandler(const std::string& name);
expected<bool> loadHandlersFromConfig();
// Data processing
expected<bool> write();
void close();
void stop();
// Configuration
void verifyConfig(pCValue instanceConfig, ConfigCheckResults& results);
std::map<std::string, std::shared_ptr<ConfigDescriptor>> getConfigDescriptors() const;
};
Lua API¶
The Output plugin is accessible through the Lua scripting interface:
-- Create Output instance
local output = api.factory.output.create(instance, "OutputHandler")
-- Handler management
local handler = output:addHandler("name", "uri")
local handler = output:addHandler("name", "uri", "sink")
local handler = output:addHandler("name", "uri", "sink", config)
local success = output:removeHandler("handler-name")
local loaded = output:loadHandlersFromConfig()
-- Configuration
local config = output:getConfig()
output:saveConfig(newConfig)
-- Utility
local name = output:getName()
Note: The Lua API requires an instance parameter for creation, unlike some other plugins.
Supported Event Schemas¶
The Output plugin supports numerous event schemas with version management:
Security Events¶
event-intrusion
- Zone intrusion detectionevent-area-enter
- Area entry eventsevent-area-exit
- Area exit eventsevent-line-crossing
- Line crossing detectionevent-tailgating
- Tailgating detectionevent-armed-person
- Armed person detectionevent-fallen-person
- Fallen person detection
Behavioral Events¶
event-loitering
- Loitering detectionevent-dwelling
- Dwelling behaviorevent-activity
- Activity detectionevent-count-changed
- Object count changesevent-crowd-detection
- Crowd density eventsevent-object-left-removed
- Object abandonment
Data Exports¶
raw-detections
- Raw object detectionstracks
- Object tracking datatrack
- Individual track informationcrop
- Cropped object imagesattribute
- Object attributesdiagnostics
- System diagnostics
System Events¶
event-status-changed
- System status changes
Examples¶
Basic File Export¶
-- Create output instance
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "FileExporter")
-- Load configuration from instance settings
output:loadHandlersFromConfig()
-- Or manually add a file handler using WriteData plugin
local fileHandler = output:addHandler(
"analytics-export",
"writedata:///data/analytics/events.json",
"output",
{
format = "json",
prettyPrint = true
}
)
if fileHandler then
print("File export handler created successfully")
end
Multi-destination Export¶
local output = api.factory.output.create(instance, "MultiExporter")
-- Add multiple output handlers using different plugins
local handlers = {
{
name = "local-file",
uri = "writedata:///data/events/",
plugin = "WriteData",
sink = "output",
config = {
format = "json",
rotation = { enabled = true, maxFileSize = "50MB" }
}
},
{
name = "mqtt-stream",
uri = "mqtt://broker.example.com:1883",
plugin = "MQTT",
sink = "events",
config = {
topic = "analytics/events",
qos = 1
}
},
{
name = "rest-webhook",
uri = "https://api.example.com/webhooks/analytics",
plugin = "REST",
sink = "alerts",
config = {
method = "POST",
headers = {
["Authorization"] = "Bearer ${API_TOKEN}",
["Content-Type"] = "application/json"
}
}
},
{
name = "video-stream",
uri = "rtmp://live.example.com/stream",
plugin = "GStreamerWriter",
sink = "video",
config = {
codec = "h264",
bitrate = 1500000
}
}
}
for _, handler in ipairs(handlers) do
local success = output:addHandler(
handler.name,
handler.uri,
handler.sink,
handler.config
)
if success then
print("Added handler: " .. handler.name)
end
end
Event Filtering and Configuration¶
local output = api.factory.output.create(instance, "FilteredExporter")
-- Configure specific event types
local config = {
handlers = {
["security-events"] = {
uri = "file:///security/events/",
sink = "security",
enabled = true
}
},
exporters = {
["event-intrusion"] = {
enabled = true,
version = 1,
includeMetadata = true,
includeCrops = true
},
["event-line-crossing"] = {
enabled = true,
version = 1,
includeMetadata = false
},
["raw-detections"] = {
enabled = false -- Disable noisy detections
},
["diagnostics"] = {
enabled = true,
version = 1,
level = "warning" -- Only warnings and above
}
}
}
output:saveConfig(config)
output:loadHandlersFromConfig()
Custom Output Handler with Error Handling¶
local output = api.factory.output.create(instance, "RobustExporter")
-- Function to safely add handler with retry
function addHandlerSafely(output, name, uri, sink, config)
local maxRetries = 3
local retryDelay = 1000 -- ms
for attempt = 1, maxRetries do
local handler = output:addHandler(name, uri, sink, config)
if handler then
print(string.format("Handler '%s' added successfully on attempt %d", name, attempt))
return handler
else
print(string.format("Failed to add handler '%s', attempt %d", name, attempt))
if attempt < maxRetries then
-- No built-in sleep in Lua - implement delay in processing loop
-- Could use os.execute("sleep " .. retryDelay/1000) on Unix
retryDelay = retryDelay * 2 -- Exponential backoff
end
end
end
print(string.format("Failed to add handler '%s' after %d attempts", name, maxRetries))
return nil
end
-- Add handlers with error handling
addHandlerSafely(output, "primary", "file:///data/primary/", "output")
addHandlerSafely(output, "backup", "file:///backup/events/", "output")
Integration with Event System¶
-- Create output for event processing
local output = api.factory.output.create(instance, "EventProcessor")
-- Configure for real-time event streaming
local streamConfig = {
handlers = {
["event-stream"] = {
uri = "mqtt://localhost:1883",
sink = "events",
enabled = true
}
},
exporters = {
["event-intrusion"] = { enabled = true, version = 1 },
["event-area-enter"] = { enabled = true, version = 1 },
["event-line-crossing"] = { enabled = true, version = 1 }
},
global = {
enableTimestamps = true,
timestampFormat = "iso8601",
bufferSize = 100, -- Small buffer for real-time
flushInterval = 1000 -- Flush every second
}
}
output:saveConfig(streamConfig)
output:loadHandlersFromConfig()
-- Event registration is solution-specific
-- No generic event system exists in the API
-- Solutions may implement callbacks differently
Best Practices¶
Plugin Selection¶
- Choose the right plugin for each output type:
- Use WriteData for file-based exports
- Use MQTT for IoT and message queue integration
- Use REST for webhooks and API integration
- Use GStreamerWriter for video output
-
Use HLS for web-based video streaming
-
Avoid duplicating functionality - let specialized plugins handle their domains
Handler Management¶
- Use descriptive handler names for easy identification
- Group related outputs using consistent naming conventions
- Monitor handler health and implement fallback mechanisms
- Configure appropriate buffer sizes based on data volume
Performance Optimization¶
- Batch outputs when possible to reduce I/O overhead
- Use appropriate flush intervals balancing latency and performance
- Implement output filtering to reduce unnecessary data export
- Monitor disk space for file-based outputs
Configuration Management¶
- Use environment variables for sensitive configuration (API keys, passwords)
- Implement configuration validation before deployment
- Version your configuration files for rollback capability
- Document custom schemas and configuration options
Error Handling¶
- Implement retry logic for network-based outputs
- Configure dead letter queues for failed exports
- Monitor output health and alert on failures
- Test failover scenarios regularly
Security¶
- Encrypt sensitive outputs when transmitting over networks
- Use secure authentication for external systems
- Implement access controls on output destinations
- Audit export activities for compliance requirements
Troubleshooting¶
Handler Creation Issues¶
-
"Handler creation failed"
- Verify URI format and accessibility
- Check file system permissions
- Validate network connectivity for remote outputs
- Ensure required authentication credentials
-
"Configuration validation error"
- Verify JSON syntax in configuration
- Check required parameters are present
- Validate parameter types and ranges
- Review schema version compatibility
Output Performance Issues¶
-
"Slow output processing"
- Increase buffer size to batch more data
- Reduce flush interval for real-time requirements
- Check I/O performance of destination
- Consider using SSD storage for file outputs
-
"High memory usage"
- Reduce buffer size if memory constrained
- Implement output filtering to reduce data volume
- Check for memory leaks in custom handlers
- Monitor garbage collection in Lua scripts
Data Export Issues¶
-
"Missing events in output"
- Verify event exporters are enabled
- Check event filtering configuration
- Monitor buffer overflow conditions
- Validate schema version compatibility
-
"Corrupted output files"
- Check disk space availability
- Verify file system health
- Implement proper file rotation
- Use atomic write operations
Network Output Issues¶
-
"Connection timeouts"
- Check network connectivity and latency
- Verify firewall rules allow outbound connections
- Implement connection retry logic
- Use appropriate timeout values
-
"Authentication failures"
- Verify credentials are correct and current
- Check token expiration and renewal
- Validate API key permissions
- Monitor authentication logs
Integration Examples¶
SIEM Integration¶
-- Configure for SIEM integration using REST plugin
local siem_config = {
handlers = {
["siem-webhook"] = {
uri = "https://siem.company.com/api/events",
plugin = "REST",
sink = "security",
enabled = true,
config = {
method = "POST",
headers = {
["X-API-Key"] = "${SIEM_API_KEY}"
},
format = "json"
}
}
},
exporters = {
["event-intrusion"] = { enabled = true, version = 1 },
["event-armed-person"] = { enabled = true, version = 1 },
["event-tailgating"] = { enabled = true, version = 1 },
["diagnostics"] = { enabled = true, version = 1, level = "error" }
}
}
Database Export¶
-- Configure for database export via REST API
local db_config = {
handlers = {
["database-api"] = {
uri = "https://db-api.company.com/v1/events",
plugin = "REST",
sink = "database",
enabled = true,
config = {
method = "POST",
batchSize = 100,
headers = {
["Content-Type"] = "application/json",
["Authorization"] = "Basic ${DB_CREDENTIALS}"
}
}
}
},
exporters = {
["raw-detections"] = {
enabled = true,
version = 1,
samplingRate = 0.05 -- 5% sampling for database
},
["tracks"] = { enabled = true, version = 1 }
}
}
Cloud Platform Integration¶
-- Configure for cloud platform integration using appropriate plugins
local cloud_config = {
handlers = {
["aws-api"] = {
uri = "https://api.gateway.amazonaws.com/prod/events",
plugin = "REST",
sink = "archive",
enabled = true,
config = {
method = "PUT",
headers = {
["x-api-key"] = "${AWS_API_KEY}"
},
region = "us-east-1"
}
},
["azure-mqtt"] = {
uri = "mqtts://iothub.azure-devices.net:8883",
plugin = "MQTT",
sink = "stream",
enabled = true,
config = {
clientId = "${AZURE_DEVICE_ID}",
username = "${AZURE_USERNAME}",
password = "${AZURE_SAS_TOKEN}",
topic = "devices/${AZURE_DEVICE_ID}/messages/events/"
}
}
}
}
See Also¶
Specialized Output Plugins¶
For detailed configuration and examples of specific output types, see:
- GStreamerWriter Plugin - Video stream output and encoding
- HLS Plugin - HTTP Live Streaming for web delivery
- WriteData Plugin - File-based data export (JSON, CSV, XML)
- MQTT Plugin - MQTT messaging for IoT integration
- REST Plugin - RESTful API integration and webhooks
- NBus Plugin - Network bus communication
- TopBus Plugin - High-performance message bus