Skip to content

EventAction Plugin

Description

EventAction is an event processing and response plugin for CVEDIA-RT that handles event-driven actions and notifications. It provides a comprehensive framework for responding to analytics events with configurable actions including disk storage, REST API calls, webhook notifications, and custom event handling scripts.

This plugin serves as the central event processing hub for CVEDIA-RT systems, enabling automated responses to detection events, system alerts, and custom analytics triggers. It supports multiple action types, event queuing, script execution, and integration with external systems for comprehensive event management and notification workflows.

Key Features

  • Multiple Action Types: Support for disk storage, REST API, webhook, and custom script actions
  • Event Queue Management: Reliable event queuing and processing with retry mechanisms
  • Script Execution: Custom Lua script execution for complex event handling logic
  • REST API Integration: HTTP/HTTPS REST API calls for external system integration
  • Disk Storage: Persistent event logging and data storage capabilities
  • Event Filtering: Configurable event filtering and routing based on event properties
  • Asynchronous Processing: Non-blocking event processing for high-performance operation
  • Error Handling: Robust error handling and recovery mechanisms
  • Event Batching: Support for batching multiple events for efficient processing
  • Configuration Reloading: Dynamic configuration updates without system restart

Requirements

Hardware Requirements

  • CPU: Multi-core processor for concurrent event processing
  • Memory: Minimum 1GB RAM (2GB+ recommended for high-volume event processing)
  • Storage: Sufficient disk space for event logs and queued events
  • Network: Network connectivity for REST API and webhook actions

Software Dependencies

  • RTCORE: CVEDIA-RT core library for plugin infrastructure
  • HTTP Client Library: HTTP/HTTPS client for REST API and webhook calls
  • JSON Library: JSON parsing and generation for event data
  • File System Libraries: File I/O operations for disk storage actions
  • Threading Library: Multi-threading support for concurrent event processing
  • Lua Runtime: Lua scripting engine for custom event handlers

Network Requirements

  • HTTP/HTTPS Access: Outbound network access for REST API and webhook actions
  • DNS Resolution: DNS resolution for external service endpoints
  • Firewall Configuration: Appropriate firewall rules for outbound connections
  • SSL/TLS Support: SSL/TLS support for secure HTTPS connections

Configuration

Basic Configuration

{
  "eventaction": {
    "actions": [
      {
        "type": "disk",
        "uri": "/var/log/cvedia/events",
        "enabled": true
      },
      {
        "type": "rest",
        "uri": "https://api.example.com/events",
        "enabled": true
      }
    ],
    "queue_size": 1000,
    "retry_attempts": 3,
    "timeout": 30
  }
}

Advanced Configuration

{
  "eventaction": {
    "actions": [
      {
        "type": "disk",
        "uri": "/var/log/cvedia/events",
        "enabled": true,
        "parameters": {
          "log_format": "json",
          "rotate_size": "100MB",
          "max_files": 10,
          "compress": true
        }
      },
      {
        "type": "rest",
        "uri": "https://api.example.com/events",
        "enabled": true,
        "parameters": {
          "method": "POST",
          "content_type": "application/json",
          "timeout": 30,
          "retry_attempts": 3,
          "headers": {
            "Authorization": "Bearer ${API_TOKEN}",
            "X-Client-ID": "cvedia-rt"
          }
        }
      },
      {
        "type": "webhook",
        "uri": "https://hooks.example.com/cvedia",
        "enabled": true,
        "parameters": {
          "secret": "${WEBHOOK_SECRET}",
          "signature_header": "X-Hub-Signature",
          "event_filters": ["motion_detected", "object_detected"]
        }
      },
      {
        "type": "script",
        "uri": "/etc/cvedia/scripts/custom_handler.lua",
        "enabled": true,
        "parameters": {
          "script_timeout": 60,
          "max_memory": "50MB"
        }
      }
    ],
    "processing": {
      "queue_size": 5000,
      "batch_size": 10,
      "batch_timeout": 1000,
      "worker_threads": 4,
      "retry_attempts": 3,
      "retry_delay": 1000
    },
    "filtering": {
      "event_types": ["motion_detected", "object_detected", "zone_violation"],
      "min_confidence": 0.7,
      "zone_filters": ["entrance", "restricted_area"]
    }
  }
}

Configuration Schema

Parameter Type Default Description
actions array [] List of event action configurations
actions[].type string required Action type ("disk", "rest", "webhook", "script")
actions[].uri string required Action URI or endpoint
actions[].enabled bool true Enable/disable this action
actions[].parameters object {} Action-specific parameters
queue_size int 1000 Maximum number of queued events
batch_size int 1 Number of events to batch together
batch_timeout int 1000 Timeout for batching events (ms)
worker_threads int 2 Number of worker threads for processing
retry_attempts int 3 Number of retry attempts for failed actions
retry_delay int 1000 Delay between retry attempts (ms)
timeout int 30 Default timeout for actions (seconds)

API Reference

C++ API (EventActionManaged)

Core Event Processing

class EventActionManaged : public ModuleImpl {
public:
    // Configuration and initialization
    expected<void> loadConfig(pCValue config) override;
    expected<pCValue> getConfigDescriptors() override;

    // Event processing
    expected<void> process(pCValue eventObj);
    expected<void> addEventToQueue(const std::string& eventId, pCValue eventData);
    expected<cvec> getEventActions();

    // Script processing
    expected<pCValue> processScript(pCValue config, pCValue data);

    // Queue management
    expected<size_t> getQueueSize();
    expected<void> clearQueue();
    expected<pCValue> getQueueStats();
};

Action Configuration Structure

struct ActionConfig {
    std::string type;           // Action type ("disk", "rest", "webhook", "script")
    std::string uri;            // Action URI or endpoint
    bool enabled = true;        // Enable/disable action
    pCValue parameters;         // Action-specific parameters
    int timeout = 30;           // Action timeout (seconds)
    int retry_attempts = 3;     // Number of retry attempts
};

struct EventActionConfig {
    std::vector<ActionConfig> actions;  // List of configured actions
    size_t queue_size = 1000;          // Event queue size
    size_t batch_size = 1;             // Event batching size
    int batch_timeout = 1000;          // Batching timeout (ms)
    int worker_threads = 2;            // Number of worker threads
    int retry_attempts = 3;            // Default retry attempts
    int retry_delay = 1000;            // Delay between retries (ms)
};

Event Data Structure

struct EventData {
    std::string event_id;           // Unique event identifier
    std::string event_type;         // Type of event ("motion_detected", etc.)
    double timestamp;               // Event timestamp
    pCValue metadata;               // Event metadata and properties
    std::string source_id;          // Source identifier (camera, sensor, etc.)
    float confidence;               // Event confidence score
    pCValue location_data;          // Spatial location information
};

Action-Specific APIs

Disk Storage Action

class EventActionDisk {
public:
    struct DiskActionConfig {
        std::string log_format = "json";    // Log format ("json", "csv", "text")
        std::string rotate_size = "100MB";  // Log rotation size
        int max_files = 10;                 // Maximum log files to keep
        bool compress = true;               // Compress rotated files
        std::string filename_pattern = "events_%Y%m%d_%H%M%S.log";
    };

    expected<void> writeEvent(const EventData& event, const DiskActionConfig& config);
    expected<void> rotateLogFiles();
};

REST API Action

class EventActionRest {
public:
    struct RestActionConfig {
        std::string method = "POST";            // HTTP method
        std::string content_type = "application/json";  // Content type
        std::map<std::string, std::string> headers;     // HTTP headers
        int timeout = 30;                       // Request timeout (seconds)
        bool verify_ssl = true;                 // Verify SSL certificates
    };

    expected<void> sendEvent(const EventData& event, const RestActionConfig& config);
    expected<void> sendBatchedEvents(const std::vector<EventData>& events, const RestActionConfig& config);
};

Webhook Action

class EventActionWebhook {
public:
    struct WebhookConfig {
        std::string secret;                     // Webhook secret for signature
        std::string signature_header = "X-Hub-Signature";  // Signature header name
        std::vector<std::string> event_filters; // Event type filters
        std::string payload_template;           // Custom payload template
    };

    expected<void> sendWebhook(const EventData& event, const WebhookConfig& config);
    std::string generateSignature(const std::string& payload, const std::string& secret);
};

Lua API

Event Action Setup

-- Create event action instance
local eventAction = api.factory.eventaction.create(instance, "event_processor")

-- Configure event actions
eventAction:configure({
    actions = {
        {
            type = "disk",
            uri = "/var/log/events",
            enabled = true,
            parameters = {
                log_format = "json",
                rotate_size = "50MB",
                max_files = 5
            }
        },
        {
            type = "rest",
            uri = "https://api.monitoring.com/events",
            enabled = true,
            parameters = {
                method = "POST",
                timeout = 30,
                headers = {
                    ["Authorization"] = "Bearer " .. api.config.get("api_token"),
                    ["Content-Type"] = "application/json"
                }
            }
        }
    },
    queue_size = 2000,
    worker_threads = 3,
    retry_attempts = 5
})

-- Load configuration
local success = eventAction:loadConfig(eventAction.config)
if success then
    api.logging.LogInfo("Event action system initialized successfully")
end

Event Processing

-- Process individual events
function processAnalyticsEvent(eventType, eventData)
    local eventObj = {
        event_id = "evt_" .. os.time() .. "_" .. math.random(1000, 9999),
        event_type = eventType,
        timestamp = os.time(),
        source_id = instance.name,
        metadata = eventData,
        confidence = eventData.confidence or 1.0
    }

    -- Add to processing queue
    eventAction:addEventToQueue(eventObj.event_id, eventObj)

    api.logging.LogInfo(string.format("Queued event: %s (%s)", eventType, eventObj.event_id))
end

-- Process motion detection events
function handleMotionDetection(detections)
    for _, detection in ipairs(detections) do
        local eventData = {
            detection_area = detection.area,
            bounding_box = {
                x = detection.x,
                y = detection.y,
                w = detection.w,
                h = detection.h
            },
            confidence = detection.confidence,
            zone = detection.zone or "unknown"
        }

        processAnalyticsEvent("motion_detected", eventData)
    end
end

Custom Script Actions

-- Custom event handler script (saved as separate .lua file)
-- This script is executed for script-type actions

function handleEvent(eventData)
    local eventType = eventData.event_type
    local metadata = eventData.metadata

    if eventType == "motion_detected" then
        return handleMotionEvent(eventData)
    elseif eventType == "object_detected" then
        return handleObjectEvent(eventData)
    elseif eventType == "zone_violation" then
        return handleZoneViolation(eventData)
    end

    return {success = true, message = "Event processed"}
end

function handleMotionEvent(eventData)
    local area = eventData.metadata.detection_area
    local zone = eventData.metadata.zone

    -- Custom logic for motion events
    if area > 5000 and zone == "restricted" then
        -- Trigger high-priority alert
        api.alerts.send({
            type = "security_alert",
            priority = "high",
            message = "Large motion detected in restricted zone",
            timestamp = eventData.timestamp
        })

        return {success = true, message = "Security alert triggered"}
    end

    return {success = true, message = "Motion event logged"}
end

Examples

Basic Event Processing System

#include "eventactionmanaged.h"
#include "rtcore.h"

// Basic event processing implementation
class EventProcessingSystem {
public:
    void initialize() {
        // Create event action processor
        eventAction_ = std::unique_ptr<EventActionManaged>(
            static_cast<EventActionManaged*>(
                EventActionManaged::create("event_processor").release()
            )
        );

        // Configure basic actions
        auto config = CValue::create();
        auto actions = CValue::createArray();

        // Disk storage action
        auto diskAction = CValue::create();
        diskAction->set("type", "disk");
        diskAction->set("uri", "/var/log/cvedia/events");
        diskAction->set("enabled", true);
        actions->append(diskAction);

        // REST API action
        auto restAction = CValue::create();
        restAction->set("type", "rest");
        restAction->set("uri", "https://api.monitoring.com/events");
        restAction->set("enabled", true);

        auto restParams = CValue::create();
        restParams->set("method", "POST");
        restParams->set("timeout", 30);
        restAction->set("parameters", restParams);

        actions->append(restAction);

        config->set("actions", actions);
        config->set("queue_size", 1000);
        config->set("worker_threads", 2);
        config->set("retry_attempts", 3);

        // Load configuration
        auto result = eventAction_->loadConfig(config);
        if (!result) {
            LOGE << "Failed to load event action configuration: " << result.error().message();
            return;
        }

        LOGI << "Event processing system initialized successfully";
    }

    void processEvent(const std::string& eventType, pCValue eventData) {
        // Create event object
        auto eventObj = CValue::create();
        eventObj->set("event_id", generateEventId());
        eventObj->set("event_type", eventType);
        eventObj->set("timestamp", getCurrentTimestamp());
        eventObj->set("source_id", "system");
        eventObj->set("metadata", eventData);

        // Process event
        auto result = eventAction_->process(eventObj);
        if (result) {
            LOGI << "Event processed successfully: " << eventType;
        } else {
            LOGE << "Failed to process event: " << result.error().message();
        }
    }

    void handleMotionDetection(const std::vector<MotionArea>& motionAreas) {
        for (const auto& area : motionAreas) {
            // Create motion event data
            auto motionData = CValue::create();
            motionData->set("area", area.area);
            motionData->set("confidence", area.confidence);
            motionData->set("x", area.boundingBox.x);
            motionData->set("y", area.boundingBox.y);
            motionData->set("w", area.boundingBox.width);
            motionData->set("h", area.boundingBox.height);

            // Process motion event
            processEvent("motion_detected", motionData);
        }
    }

private:
    std::unique_ptr<EventActionManaged> eventAction_;

    std::string generateEventId() {
        // Generate unique event ID
        return "evt_" + std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(
            std::chrono::steady_clock::now().time_since_epoch()).count());
    }

    double getCurrentTimestamp() {
        return std::chrono::duration_cast<std::chrono::milliseconds>(
            std::chrono::steady_clock::now().time_since_epoch()).count() / 1000.0;
    }
};

Advanced Multi-Action Event System

// Advanced event processing with multiple action types
class AdvancedEventSystem {
public:
    void initializeAdvancedActions() {
        // Initialize event action system
        initializeEventActions();

        // Setup custom event filters
        setupEventFilters();

        // Configure action-specific settings
        configureActionSettings();

        LOGI << "Advanced event system initialized";
    }

    void processComplexEvent(const ComplexEvent& event) {
        // Determine event priority
        EventPriority priority = determineEventPriority(event);

        // Create enriched event data
        auto enrichedData = enrichEventData(event);

        // Route to appropriate actions based on priority
        routeEventByPriority(event.type, enrichedData, priority);
    }

private:
    std::unique_ptr<EventActionManaged> eventAction_;
    std::map<std::string, ActionConfig> actionConfigs_;

    enum class EventPriority {
        Low, Medium, High, Critical
    };

    void initializeEventActions() {
        eventAction_ = std::unique_ptr<EventActionManaged>(
            static_cast<EventActionManaged*>(
                EventActionManaged::create("advanced_processor").release()
            )
        );

        // Configure multiple action types
        auto config = createAdvancedConfiguration();
        eventAction_->loadConfig(config);
    }

    pCValue createAdvancedConfiguration() {
        auto config = CValue::create();
        auto actions = CValue::createArray();

        // High-performance disk logging
        auto diskAction = CValue::create();
        diskAction->set("type", "disk");
        diskAction->set("uri", "/var/log/cvedia/events");
        diskAction->set("enabled", true);

        auto diskParams = CValue::create();
        diskParams->set("log_format", "json");
        diskParams->set("rotate_size", "500MB");
        diskParams->set("max_files", 20);
        diskParams->set("compress", true);
        diskAction->set("parameters", diskParams);
        actions->append(diskAction);

        // Multiple REST endpoints
        auto primaryRest = createRestAction("https://primary.api.com/events", "primary");
        auto backupRest = createRestAction("https://backup.api.com/events", "backup");
        actions->append(primaryRest);
        actions->append(backupRest);

        // Webhook for real-time notifications
        auto webhook = CValue::create();
        webhook->set("type", "webhook");
        webhook->set("uri", "https://hooks.alerting.com/cvedia");
        webhook->set("enabled", true);

        auto webhookParams = CValue::create();
        webhookParams->set("secret", getWebhookSecret());
        webhookParams->set("signature_header", "X-Cvedia-Signature");

        auto eventFilters = CValue::createArray();
        eventFilters->append(CValue::create("motion_detected"));
        eventFilters->append(CValue::create("zone_violation"));
        webhookParams->set("event_filters", eventFilters);

        webhook->set("parameters", webhookParams);
        actions->append(webhook);

        // Custom script for complex processing
        auto scriptAction = CValue::create();
        scriptAction->set("type", "script");
        scriptAction->set("uri", "/etc/cvedia/scripts/advanced_handler.lua");
        scriptAction->set("enabled", true);

        auto scriptParams = CValue::create();
        scriptParams->set("script_timeout", 120);
        scriptParams->set("max_memory", "100MB");
        scriptAction->set("parameters", scriptParams);
        actions->append(scriptAction);

        config->set("actions", actions);

        // Advanced processing configuration
        auto processing = CValue::create();
        processing->set("queue_size", 10000);
        processing->set("batch_size", 20);
        processing->set("batch_timeout", 2000);
        processing->set("worker_threads", 6);
        processing->set("retry_attempts", 5);
        processing->set("retry_delay", 2000);
        config->set("processing", processing);

        return config;
    }

    void routeEventByPriority(const std::string& eventType, pCValue eventData, EventPriority priority) {
        switch (priority) {
            case EventPriority::Critical:
                // Send to all endpoints immediately
                processCriticalEvent(eventType, eventData);
                break;

            case EventPriority::High:
                // Send to primary endpoints with high priority
                processHighPriorityEvent(eventType, eventData);
                break;

            case EventPriority::Medium:
                // Standard processing
                eventAction_->process(eventData);
                break;

            case EventPriority::Low:
                // Queue for batch processing
                eventAction_->addEventToQueue(eventData->get("event_id").getString(), eventData);
                break;
        }
    }

    void processCriticalEvent(const std::string& eventType, pCValue eventData) {
        // Bypass queue for critical events
        eventAction_->process(eventData);

        // Send immediate notifications
        sendImmediateNotification(eventType, eventData);

        // Log critical event
        LOGW << "CRITICAL EVENT: " << eventType << " processed immediately";
    }
};

Complete Event-Driven Security System

-- Complete event-driven security system
local eventAction = api.factory.eventaction.create(instance, "security_events")
local motion = api.factory.motion.create(instance, "security_motion")
local alerts = api.factory.alerts.create(instance, "security_alerts")

-- Security event configuration
local securityConfig = {
    event_retention_days = 30,
    high_priority_zones = {"entrance", "restricted", "perimeter"},
    notification_endpoints = {
        primary = "https://security.company.com/api/events",
        backup = "https://backup.security.com/api/events",
        sms = "https://sms.provider.com/api/send"
    },
    escalation_rules = {
        motion_threshold = 3,  -- events in 5 minutes
        motion_timeout = 300,  -- 5 minutes
        zone_violation_immediate = true
    }
}

-- Event statistics tracking
local eventStats = {
    total_events = 0,
    motion_events = 0,
    zone_violations = 0,
    alerts_sent = 0,
    failed_actions = 0
}

-- Initialize security event system
function initializeSecurityEventSystem()
    api.logging.LogInfo("Initializing security event processing system")

    -- Configure event actions
    eventAction:configure({
        actions = {
            -- Secure disk logging
            {
                type = "disk",
                uri = "/var/log/security/events",
                enabled = true,
                parameters = {
                    log_format = "json",
                    rotate_size = "100MB",
                    max_files = 50,  -- 30 days retention
                    compress = true,
                    filename_pattern = "security_%Y%m%d_%H.log"
                }
            },

            -- Primary security API
            {
                type = "rest",
                uri = securityConfig.notification_endpoints.primary,
                enabled = true,
                parameters = {
                    method = "POST",
                    timeout = 15,
                    retry_attempts = 5,
                    headers = {
                        ["Authorization"] = "Bearer " .. api.config.get("security_api_token"),
                        ["Content-Type"] = "application/json",
                        ["X-System-ID"] = "cvedia-rt-security"
                    }
                }
            },

            -- Backup security API
            {
                type = "rest",
                uri = securityConfig.notification_endpoints.backup,
                enabled = true,
                parameters = {
                    method = "POST",
                    timeout = 30,
                    retry_attempts = 3
                }
            },

            -- Real-time webhook for immediate alerts
            {
                type = "webhook",
                uri = "https://hooks.security.com/cvedia-alerts",
                enabled = true,
                parameters = {
                    secret = api.config.get("webhook_secret"),
                    signature_header = "X-Security-Signature",
                    event_filters = {"zone_violation", "multiple_motion_events"}
                }
            },

            -- Custom security script
            {
                type = "script",
                uri = "/etc/cvedia/security/event_handler.lua",
                enabled = true,
                parameters = {
                    script_timeout = 60,
                    max_memory = "50MB"
                }
            }
        },

        processing = {
            queue_size = 5000,
            batch_size = 5,
            batch_timeout = 1000,
            worker_threads = 4,
            retry_attempts = 5,
            retry_delay = 2000
        },

        filtering = {
            event_types = {"motion_detected", "zone_violation", "tampering_detected"},
            min_confidence = 0.6,
            zone_filters = securityConfig.high_priority_zones
        }
    })

    -- Load event action configuration
    local success = eventAction:loadConfig(eventAction.config)
    if not success then
        api.logging.LogError("Failed to initialize security event system")
        return false
    end

    -- Setup event monitoring
    setupEventMonitoring()

    -- Start event statistics tracking
    startEventStatisticsTracking()

    api.logging.LogInfo("Security event system initialized successfully")
    return true
end

-- Process security events from motion detection
function processSecurityMotionEvent(detection)
    eventStats.total_events = eventStats.total_events + 1
    eventStats.motion_events = eventStats.motion_events + 1

    local currentTime = os.time()

    -- Create comprehensive event data
    local eventData = {
        event_id = api.system.generateUUID(),
        event_type = "motion_detected",
        timestamp = currentTime,
        source_id = instance.name,
        confidence = detection.confidence,
        metadata = {
            detection_area = detection.area,
            bounding_box = {
                x = detection.x,
                y = detection.y,
                w = detection.w,
                h = detection.h
            },
            zone = determineSecurityZone(detection),
            camera_location = api.config.get("camera_location", "unknown"),
            system_info = {
                version = "1.0.0",  -- Replace with actual version retrieval
                uptime = os.time()   -- Replace with actual uptime if available
            }
        }
    }

    -- Determine event priority
    local priority = determineEventPriority(eventData)

    -- Process based on priority
    if priority == "critical" then
        processCriticalSecurityEvent(eventData)
    elseif priority == "high" then
        processHighPriorityEvent(eventData)
    else
        processStandardEvent(eventData)
    end
end

-- Process critical security events
function processCriticalSecurityEvent(eventData)
    api.logging.LogWarning("CRITICAL SECURITY EVENT: " .. (eventData.event_type or "unknown"))

    -- Immediate processing (bypass queue)
    eventAction:process(eventData)

    -- Send immediate SMS alert
    sendImmediateSMSAlert(eventData)

    -- Note: Emergency recording would require implementation
    -- Actual implementation depends on storage plugin availability
    api.logging.LogInfo("Emergency recording requested for event: " .. eventData.event_id)

    eventStats.alerts_sent = eventStats.alerts_sent + 1
end

-- Determine event priority based on various factors
function determineEventPriority(eventData)
    local zone = eventData.metadata.zone
    local area = eventData.metadata.detection_area
    local confidence = eventData.confidence

    -- Critical: High confidence in restricted zones
    if confidence > 0.9 and isHighSecurityZone(zone) then
        return "critical"
    end

    -- High: Large movement in monitored areas
    if area > 10000 and confidence > 0.7 then
        return "high"
    end

    -- Check for multiple events (escalation)
    if checkEventEscalation(eventData) then
        return "high"
    end

    return "standard"
end

-- Check for event escalation patterns
function checkEventEscalation(eventData)
    local currentTime = eventData.timestamp
    local recentEvents = getRecentEvents(currentTime - securityConfig.escalation_rules.motion_timeout)

    -- Count recent motion events
    local motionCount = 0
    for _, event in ipairs(recentEvents) do
        if event.event_type == "motion_detected" then
            motionCount = motionCount + 1
        end
    end

    -- Escalate if too many motion events
    if motionCount >= securityConfig.escalation_rules.motion_threshold then
        api.logging.LogWarning("Event escalation triggered: " .. motionCount .. " motion events in " .. securityConfig.escalation_rules.motion_timeout .. " seconds")

        -- Create escalation event
        local escalationEvent = {
            event_id = "evt_" .. os.time() .. "_" .. math.random(1000, 9999),
            event_type = "multiple_motion_events",
            timestamp = currentTime,
            source_id = instance.name,
            metadata = {
                escalation_reason = "multiple_motion_threshold",
                event_count = motionCount,
                time_window = securityConfig.escalation_rules.motion_timeout,
                related_events = extractEventIds(recentEvents)
            }
        }

        eventAction:process(escalationEvent)
        return true
    end

    return false
end

-- Send immediate SMS alert for critical events
function sendImmediateSMSAlert(eventData)
    local smsData = {
        to = api.config.get("security_phone"),
        message = string.format(
            "SECURITY ALERT: %s detected at %s. Zone: %s. Confidence: %.2f",
            eventData.event_type,
            os.date("%H:%M:%S", eventData.timestamp),
            eventData.metadata.zone,
            eventData.confidence
        ),
        priority = "urgent"
    }

    api.http.post(securityConfig.notification_endpoints.sms, smsData, {
        timeout = 10,
        headers = {
            ["Authorization"] = "Bearer " .. api.config.get("sms_api_token")
        }
    })
end

-- Monitor event processing health
function setupEventMonitoring()
    -- Implement periodic health checks in main loop
    -- Check every 60 seconds
    local function performHealthCheck()
        local queueStats = eventAction:getQueueStats()
        if queueStats then
            local queueSize = queueStats.current_size or 0
            local maxSize = queueStats.max_size or 1000

            if queueSize > maxSize * 0.8 then
                api.logging.LogWarning("Event queue nearly full: " .. queueSize .. "/" .. maxSize .. " events")
            end

            -- Log processing statistics
            api.logging.LogDebug("Event Processing: Queue: " .. queueSize .. ", Processed: " .. (queueStats.processed_count or 0) .. ", Failed: " .. (queueStats.failed_count or 0))
        end
    end)
end

-- Track event statistics
function startEventStatisticsTracking()
    -- Implement periodic statistics logging in main loop
    -- Log every 5 minutes (300 seconds)
    local function logStatistics()
        api.logging.LogInfo("Security Event Statistics:")
        api.logging.LogInfo("  Total Events: " .. (eventStats.total_events or 0))
        api.logging.LogInfo("  Motion Events: " .. (eventStats.motion_events or 0))
        api.logging.LogInfo("  Zone Violations: " .. (eventStats.zone_violations or 0))
        api.logging.LogInfo("  Alerts Sent: " .. (eventStats.alerts_sent or 0))
        api.logging.LogInfo("  Failed Actions: " .. (eventStats.failed_actions or 0))

        -- Reset counters periodically for rate calculation
        if eventStats.total_events > 10000 then
            eventStats = {
                total_events = 0,
                motion_events = 0,
                zone_violations = 0,
                alerts_sent = 0,
                failed_actions = 0
            }
        end
    end)
end

-- Initialize the security event system
initializeSecurityEventSystem()

-- info("Security event processing system is active")

Best Practices

Performance Optimization

  • Queue Management: Configure appropriate queue sizes based on expected event volume
  • Batch Processing: Use event batching for high-volume scenarios to improve throughput
  • Worker Threads: Scale worker threads based on available CPU cores and I/O requirements
  • Timeout Configuration: Set reasonable timeouts to prevent hanging operations

Reliability Guidelines

  • Retry Logic: Implement exponential backoff for retry attempts
  • Error Handling: Handle network failures and service unavailability gracefully
  • Monitoring: Monitor queue sizes and processing rates for system health
  • Failover: Configure backup endpoints for critical event delivery

Security Considerations

  • Authentication: Use secure authentication methods for REST API and webhook endpoints
  • Encryption: Ensure HTTPS/TLS for all network communications
  • Secret Management: Store API keys and secrets securely
  • Event Filtering: Implement proper event filtering to avoid information leakage

Integration Guidelines

  • Event Schema: Use consistent event schemas across all actions
  • Idempotency: Ensure event processing is idempotent for retry scenarios
  • Logging: Implement comprehensive logging for audit and debugging
  • Configuration Management: Use environment variables for sensitive configuration

Troubleshooting

Common Issues

High Queue Usage

// Monitor and adjust queue configuration
auto queueStats = eventAction->getQueueStats();
if (queueStats && queueStats->get("utilization").getFloat() > 0.8f) {
    // Increase queue size or worker threads
    auto config = eventAction->getConfig();
    config->set("queue_size", 5000);
    config->set("worker_threads", 4);
    eventAction->loadConfig(config);
}

Failed REST API Calls

  • Network Connectivity: Verify network access to endpoints
  • Authentication: Check API credentials and token expiration
  • Rate Limiting: Implement proper rate limiting and retry logic
  • SSL/TLS Issues: Verify certificate validity and SSL configuration

Script Execution Failures

  • Script Syntax: Validate Lua script syntax and dependencies
  • Memory Limits: Monitor script memory usage and adjust limits
  • Timeout Issues: Increase script timeout for complex processing
  • Permission Issues: Ensure script files have proper read permissions

Disk Storage Issues

  • Disk Space: Monitor available disk space for log storage
  • File Permissions: Verify write permissions for log directories
  • Log Rotation: Configure proper log rotation to prevent disk full
  • I/O Performance: Monitor disk I/O performance under high load

Debugging Tools

// Event action diagnostics
void diagnoseEventActions(EventActionManaged* eventAction) {
    // Get queue statistics
    auto queueStats = eventAction->getQueueStats();
    if (queueStats) {
        LOGI << "Queue Statistics:";
        LOGI << "  Current Size: " << queueStats->get("current_size").getInt();
        LOGI << "  Max Size: " << queueStats->get("max_size").getInt();
        LOGI << "  Processed Count: " << queueStats->get("processed_count").getInt();
        LOGI << "  Failed Count: " << queueStats->get("failed_count").getInt();
    }

    // Get action configurations
    auto actions = eventAction->getEventActions();
    if (actions) {
        LOGI << "Configured Actions:";
        for (const auto& action : actions.value()) {
            auto type = action->get("type").getString();
            auto uri = action->get("uri").getString();
            auto enabled = action->get("enabled").getBool();
            LOGI << "  Action: " << type << " -> " << uri << " (enabled: " << enabled << ")";
        }
    }
}

Integration Examples

Video Management System Integration

// Complete VMS integration with event actions
class VMSEventIntegration {
public:
    void initialize() {
        // Initialize event processing
        initializeEventActions();

        // Setup VMS-specific actions
        setupVMSActions();

        // Configure event routing
        setupEventRouting();
    }

    void processVMSEvent(const VMSEvent& event) {
        // Convert VMS event to standard format
        auto eventData = convertVMSEvent(event);

        // Route based on event type and priority
        routeVMSEvent(eventData);

        // Update VMS database
        updateVMSDatabase(event);
    }

private:
    void setupVMSActions() {
        // Configure VMS-specific REST endpoints
        // Configure database logging
        // Setup alert escalation
    }

    pCValue convertVMSEvent(const VMSEvent& event) {
        auto eventData = CValue::create();
        eventData->set("event_id", event.id);
        eventData->set("event_type", event.type);
        eventData->set("timestamp", event.timestamp);
        eventData->set("camera_id", event.cameraId);
        eventData->set("metadata", event.metadata);
        return eventData;
    }
};

See Also