Skip to content

Output Plugin

Description

The Output plugin serves as CVEDIA-RT's central coordinator for data export and output management. Rather than implementing all output functionality itself, it delegates to specialized output plugins while providing a unified interface for managing multiple output handlers. This architecture allows flexible routing of analytics results, events, and metadata to various destinations through purpose-built plugins.

The Output plugin acts as an orchestration layer that:

  • Manages multiple output handlers simultaneously
  • Routes data to appropriate specialized plugins
  • Provides unified configuration and lifecycle management
  • Handles event schema versioning and validation
  • Coordinates between different output destinations

Key Features

  • Plugin Coordination: Delegates to specialized plugins for actual output operations
  • Handler Management: Create and manage multiple output handlers from different plugins
  • Event Schema Management: Version-controlled event formats across all output types
  • Unified Configuration: Single configuration point for all output destinations
  • Dynamic Routing: Route different event types to appropriate output plugins
  • Lifecycle Management: Coordinate initialization, operation, and cleanup of output handlers
  • Plugin Integration: Seamlessly works with GStreamerWriter, HLS, WriteData, MQTT, REST, and other output plugins

When to Use

  • Managing multiple output destinations simultaneously
  • Routing different event types to different output plugins
  • Centralizing output configuration and management
  • Implementing complex output workflows with multiple handlers
  • Coordinating between file, streaming, and messaging outputs
  • Building flexible data export pipelines

Available Output Plugins

The Output plugin coordinates with these specialized plugins:

  • GStreamerWriter: Video stream output and encoding
  • HLS: HTTP Live Streaming for web-based video delivery
  • WriteData: File-based data export (JSON, CSV, XML)
  • MQTT: Message queue telemetry transport for IoT integration
  • REST: RESTful API integration for webhooks and HTTP endpoints
  • NBus: Network bus communication for distributed systems
  • TopBus: High-performance message bus for real-time data

Plugin Selection Guide

Use Case Recommended Plugin URI Scheme
RTSP video streaming GStreamerWriter rtsp://
Web video streaming HLS hls://
File data export WriteData json://, csv://, xml://
IoT messaging MQTT mqtt://, mqtts://
Webhook integration REST http://, https://
Network bus messaging NBus nbus://
High-performance messaging TopBus topbus://
ZeroMQ messaging ZMQ zmq://

Requirements

Software Dependencies

  • File system write permissions (for file outputs)
  • Network connectivity (for remote outputs)
  • Optional: Database drivers for database outputs
  • Optional: HTTP libraries for REST API outputs

Storage Requirements

  • Varies by data volume and retention settings
  • Recommend SSD storage for high-frequency outputs
  • Monitor disk space for file-based outputs

Configuration

Basic Configuration

The Output plugin configuration defines handlers that utilize specialized output plugins:

{
  "output": {
    "handlers": {
      "file-export": {
        "uri": "writedata:///data/exports/",
        "plugin": "WriteData",
        "format": "json",
        "enabled": true
      },
      "mqtt-events": {
        "uri": "mqtt://localhost:1883",
        "plugin": "MQTT",
        "topic": "analytics/events",
        "enabled": true
      }
    },
    "exporters": {
      "event-intrusion": {
        "enabled": true,
        "version": 1
      }
    }
  }
}

Advanced Configuration

{
  "output": {
    "handlers": {
      "primary-export": {
        "uri": "writedata:///data/analytics/",
        "plugin": "WriteData",
        "sink": "output",
        "format": "json",
        "enabled": true,
        "config": {
          "rotation": {
            "enabled": true,
            "maxFileSize": "100MB",
            "maxFiles": 10
          }
        }
      },
      "mqtt-stream": {
        "uri": "mqtt://broker.example.com:1883",
        "plugin": "MQTT",
        "sink": "events",
        "enabled": true,
        "config": {
          "topics": {
            "events": "analytics/events",
            "detections": "analytics/detections"
          },
          "qos": 1
        }
      },
      "rest-api": {
        "uri": "https://api.monitoring.com/webhooks",
        "plugin": "REST",
        "sink": "alerts",
        "enabled": true,
        "config": {
          "authentication": {
            "type": "bearer",
            "token": "${API_TOKEN}"
          },
          "retry": {
            "enabled": true,
            "maxRetries": 3,
            "backoffMs": 1000
          }
        }
      },
      "video-stream": {
        "uri": "rtmp://streaming.example.com/live",
        "plugin": "GStreamerWriter",
        "sink": "video",
        "enabled": true,
        "config": {
          "codec": "h264",
          "bitrate": 2000000
        }
      },
      "hls-output": {
        "uri": "hls:///var/www/html/live/",
        "plugin": "HLS",
        "sink": "video",
        "enabled": true,
        "config": {
          "segmentDuration": 2,
          "playlistLength": 10
        }
      }
    },
    "exporters": {
      "event-intrusion": {
        "enabled": true,
        "version": 1,
        "includeMetadata": true,
        "includeCrops": false
      },
      "event-area-enter": {
        "enabled": true,
        "version": 1,
        "includeMetadata": true
      },
      "raw-detections": {
        "enabled": false,
        "version": 1,
        "samplingRate": 0.1
      },
      "diagnostics": {
        "enabled": true,
        "version": 1,
        "level": "error"
      }
    },
    "global": {
      "enableTimestamps": true,
      "timestampFormat": "iso8601",
      "bufferSize": 1000,
      "flushInterval": 5000
    }
  }
}

Configuration Schema

Parameter Type Default Description
handlers object {} Output handler configurations
handlers.*.uri string required Output destination URI (plugin-specific)
handlers.*.plugin string auto-detected Specialized plugin to use
handlers.*.sink string "output" Data sink name to connect to
handlers.*.enabled boolean true Enable/disable handler
handlers.*.config object {} Plugin-specific configuration
exporters object {} Event exporter configurations
exporters.*.enabled boolean true Enable/disable exporter
exporters.*.version integer 1 Schema version to use
global.enableTimestamps boolean true Include timestamps in output
global.timestampFormat string "iso8601" Timestamp format
global.bufferSize integer 1000 Output buffer size
global.flushInterval integer 5000 Buffer flush interval (ms)

API Reference

C++ API

The Output plugin implements the iface::Output interface:

class OutputImpl : public iface::Output {
public:
    // Handler management
    expected<std::shared_ptr<iface::OutputHandler>> addHandler(
        const std::string& name, 
        const std::string& outputUri, 
        const std::string& sink = "output", 
        pCValue config = nullptr);

    expected<void> removeHandler(const std::string& name);
    expected<bool> hasHandler(const std::string& name);
    expected<bool> loadHandlersFromConfig();

    // Data processing
    expected<bool> write();
    void close();
    void stop();

    // Configuration
    void verifyConfig(pCValue instanceConfig, ConfigCheckResults& results);
    std::map<std::string, std::shared_ptr<ConfigDescriptor>> getConfigDescriptors() const;
};

Lua API

The Output plugin is accessible through the Lua scripting interface:

-- Create Output instance
local output = api.factory.output.create(instance, "OutputHandler")

-- Handler management
local handler = output:addHandler("name", "uri")
local handler = output:addHandler("name", "uri", "sink")
local handler = output:addHandler("name", "uri", "sink", config)

local success = output:removeHandler("handler-name")
local loaded = output:loadHandlersFromConfig()

-- Configuration
local config = output:getConfig()
output:saveConfig(newConfig)

-- Utility
local name = output:getName()

Note: The Lua API requires an instance parameter for creation, unlike some other plugins.

Full Lua API Reference →

Supported Event Schemas

The Output plugin supports numerous event schemas with version management:

Security Events

  • event-intrusion - Zone intrusion detection
  • event-area-enter - Area entry events
  • event-area-exit - Area exit events
  • event-line-crossing - Line crossing detection
  • event-tailgating - Tailgating detection
  • event-armed-person - Armed person detection
  • event-fallen-person - Fallen person detection

Behavioral Events

  • event-loitering - Loitering detection
  • event-dwelling - Dwelling behavior
  • event-activity - Activity detection
  • event-count-changed - Object count changes
  • event-crowd-detection - Crowd density events
  • event-object-left-removed - Object abandonment

Data Exports

  • raw-detections - Raw object detections
  • tracks - Object tracking data
  • track - Individual track information
  • crop - Cropped object images
  • attribute - Object attributes
  • diagnostics - System diagnostics

System Events

  • event-status-changed - System status changes

Examples

Basic File Export

-- Create output instance
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "FileExporter")

-- Load configuration from instance settings
output:loadHandlersFromConfig()

-- Or manually add a file handler using WriteData plugin
local fileHandler = output:addHandler(
    "analytics-export",
    "writedata:///data/analytics/events.json",
    "output",
    {
        format = "json",
        prettyPrint = true
    }
)

if fileHandler then
    print("File export handler created successfully")
end

Multi-destination Export

local output = api.factory.output.create(instance, "MultiExporter")

-- Add multiple output handlers using different plugins
local handlers = {
    {
        name = "local-file",
        uri = "writedata:///data/events/",
        plugin = "WriteData",
        sink = "output",
        config = {
            format = "json",
            rotation = { enabled = true, maxFileSize = "50MB" }
        }
    },
    {
        name = "mqtt-stream",
        uri = "mqtt://broker.example.com:1883",
        plugin = "MQTT",
        sink = "events",
        config = {
            topic = "analytics/events",
            qos = 1
        }
    },
    {
        name = "rest-webhook",
        uri = "https://api.example.com/webhooks/analytics",
        plugin = "REST",
        sink = "alerts",
        config = {
            method = "POST",
            headers = {
                ["Authorization"] = "Bearer ${API_TOKEN}",
                ["Content-Type"] = "application/json"
            }
        }
    },
    {
        name = "video-stream",
        uri = "rtmp://live.example.com/stream",
        plugin = "GStreamerWriter",
        sink = "video",
        config = {
            codec = "h264",
            bitrate = 1500000
        }
    }
}

for _, handler in ipairs(handlers) do
    local success = output:addHandler(
        handler.name,
        handler.uri,
        handler.sink,
        handler.config
    )
    if success then
        print("Added handler: " .. handler.name)
    end
end

Event Filtering and Configuration

local output = api.factory.output.create(instance, "FilteredExporter")

-- Configure specific event types
local config = {
    handlers = {
        ["security-events"] = {
            uri = "file:///security/events/",
            sink = "security",
            enabled = true
        }
    },
    exporters = {
        ["event-intrusion"] = {
            enabled = true,
            version = 1,
            includeMetadata = true,
            includeCrops = true
        },
        ["event-line-crossing"] = {
            enabled = true,
            version = 1,
            includeMetadata = false
        },
        ["raw-detections"] = {
            enabled = false  -- Disable noisy detections
        },
        ["diagnostics"] = {
            enabled = true,
            version = 1,
            level = "warning"  -- Only warnings and above
        }
    }
}

output:saveConfig(config)
output:loadHandlersFromConfig()

Custom Output Handler with Error Handling

local output = api.factory.output.create(instance, "RobustExporter")

-- Function to safely add handler with retry
function addHandlerSafely(output, name, uri, sink, config)
    local maxRetries = 3
    local retryDelay = 1000 -- ms

    for attempt = 1, maxRetries do
        local handler = output:addHandler(name, uri, sink, config)
        if handler then
            print(string.format("Handler '%s' added successfully on attempt %d", name, attempt))
            return handler
        else
            print(string.format("Failed to add handler '%s', attempt %d", name, attempt))
            if attempt < maxRetries then
                -- No built-in sleep in Lua - implement delay in processing loop
                -- Could use os.execute("sleep " .. retryDelay/1000) on Unix
                retryDelay = retryDelay * 2  -- Exponential backoff
            end
        end
    end

    print(string.format("Failed to add handler '%s' after %d attempts", name, maxRetries))
    return nil
end

-- Add handlers with error handling
addHandlerSafely(output, "primary", "file:///data/primary/", "output")
addHandlerSafely(output, "backup", "file:///backup/events/", "output")

Integration with Event System

-- Create output for event processing
local output = api.factory.output.create(instance, "EventProcessor")

-- Configure for real-time event streaming
local streamConfig = {
    handlers = {
        ["event-stream"] = {
            uri = "mqtt://localhost:1883",
            sink = "events",
            enabled = true
        }
    },
    exporters = {
        ["event-intrusion"] = { enabled = true, version = 1 },
        ["event-area-enter"] = { enabled = true, version = 1 },
        ["event-line-crossing"] = { enabled = true, version = 1 }
    },
    global = {
        enableTimestamps = true,
        timestampFormat = "iso8601",
        bufferSize = 100,  -- Small buffer for real-time
        flushInterval = 1000  -- Flush every second
    }
}

output:saveConfig(streamConfig)
output:loadHandlersFromConfig()

-- Event registration is solution-specific
-- No generic event system exists in the API
-- Solutions may implement callbacks differently

Best Practices

Plugin Selection

  1. Choose the right plugin for each output type:
  2. Use WriteData for file-based exports
  3. Use MQTT for IoT and message queue integration
  4. Use REST for webhooks and API integration
  5. Use GStreamerWriter for video output
  6. Use HLS for web-based video streaming

  7. Avoid duplicating functionality - let specialized plugins handle their domains

Handler Management

  1. Use descriptive handler names for easy identification
  2. Group related outputs using consistent naming conventions
  3. Monitor handler health and implement fallback mechanisms
  4. Configure appropriate buffer sizes based on data volume

Performance Optimization

  1. Batch outputs when possible to reduce I/O overhead
  2. Use appropriate flush intervals balancing latency and performance
  3. Implement output filtering to reduce unnecessary data export
  4. Monitor disk space for file-based outputs

Configuration Management

  1. Use environment variables for sensitive configuration (API keys, passwords)
  2. Implement configuration validation before deployment
  3. Version your configuration files for rollback capability
  4. Document custom schemas and configuration options

Error Handling

  1. Implement retry logic for network-based outputs
  2. Configure dead letter queues for failed exports
  3. Monitor output health and alert on failures
  4. Test failover scenarios regularly

Security

  1. Encrypt sensitive outputs when transmitting over networks
  2. Use secure authentication for external systems
  3. Implement access controls on output destinations
  4. Audit export activities for compliance requirements

Troubleshooting

Handler Creation Issues

  1. "Handler creation failed"

    • Verify URI format and accessibility
    • Check file system permissions
    • Validate network connectivity for remote outputs
    • Ensure required authentication credentials
  2. "Configuration validation error"

    • Verify JSON syntax in configuration
    • Check required parameters are present
    • Validate parameter types and ranges
    • Review schema version compatibility

Output Performance Issues

  1. "Slow output processing"

    • Increase buffer size to batch more data
    • Reduce flush interval for real-time requirements
    • Check I/O performance of destination
    • Consider using SSD storage for file outputs
  2. "High memory usage"

    • Reduce buffer size if memory constrained
    • Implement output filtering to reduce data volume
    • Check for memory leaks in custom handlers
    • Monitor garbage collection in Lua scripts

Data Export Issues

  1. "Missing events in output"

    • Verify event exporters are enabled
    • Check event filtering configuration
    • Monitor buffer overflow conditions
    • Validate schema version compatibility
  2. "Corrupted output files"

    • Check disk space availability
    • Verify file system health
    • Implement proper file rotation
    • Use atomic write operations

Network Output Issues

  1. "Connection timeouts"

    • Check network connectivity and latency
    • Verify firewall rules allow outbound connections
    • Implement connection retry logic
    • Use appropriate timeout values
  2. "Authentication failures"

    • Verify credentials are correct and current
    • Check token expiration and renewal
    • Validate API key permissions
    • Monitor authentication logs

Integration Examples

SIEM Integration

-- Configure for SIEM integration using REST plugin
local siem_config = {
    handlers = {
        ["siem-webhook"] = {
            uri = "https://siem.company.com/api/events",
            plugin = "REST",
            sink = "security",
            enabled = true,
            config = {
                method = "POST",
                headers = {
                    ["X-API-Key"] = "${SIEM_API_KEY}"
                },
                format = "json"
            }
        }
    },
    exporters = {
        ["event-intrusion"] = { enabled = true, version = 1 },
        ["event-armed-person"] = { enabled = true, version = 1 },
        ["event-tailgating"] = { enabled = true, version = 1 },
        ["diagnostics"] = { enabled = true, version = 1, level = "error" }
    }
}

Database Export

-- Configure for database export via REST API
local db_config = {
    handlers = {
        ["database-api"] = {
            uri = "https://db-api.company.com/v1/events",
            plugin = "REST",
            sink = "database",
            enabled = true,
            config = {
                method = "POST",
                batchSize = 100,
                headers = {
                    ["Content-Type"] = "application/json",
                    ["Authorization"] = "Basic ${DB_CREDENTIALS}"
                }
            }
        }
    },
    exporters = {
        ["raw-detections"] = { 
            enabled = true, 
            version = 1,
            samplingRate = 0.05  -- 5% sampling for database
        },
        ["tracks"] = { enabled = true, version = 1 }
    }
}

Cloud Platform Integration

-- Configure for cloud platform integration using appropriate plugins
local cloud_config = {
    handlers = {
        ["aws-api"] = {
            uri = "https://api.gateway.amazonaws.com/prod/events",
            plugin = "REST",
            sink = "archive",
            enabled = true,
            config = {
                method = "PUT",
                headers = {
                    ["x-api-key"] = "${AWS_API_KEY}"
                },
                region = "us-east-1"
            }
        },
        ["azure-mqtt"] = {
            uri = "mqtts://iothub.azure-devices.net:8883",
            plugin = "MQTT",
            sink = "stream",
            enabled = true,
            config = {
                clientId = "${AZURE_DEVICE_ID}",
                username = "${AZURE_USERNAME}",
                password = "${AZURE_SAS_TOKEN}",
                topic = "devices/${AZURE_DEVICE_ID}/messages/events/"
            }
        }
    }
}

See Also

Specialized Output Plugins

For detailed configuration and examples of specific output types, see: