EventAction Plugin¶
Description¶
EventAction is an event processing and response plugin for CVEDIA-RT that handles event-driven actions and notifications. It provides a comprehensive framework for responding to analytics events with configurable actions including disk storage, REST API calls, webhook notifications, and custom event handling scripts.
This plugin serves as the central event processing hub for CVEDIA-RT systems, enabling automated responses to detection events, system alerts, and custom analytics triggers. It supports multiple action types, event queuing, script execution, and integration with external systems for comprehensive event management and notification workflows.
Key Features¶
- Multiple Action Types: Support for disk storage, REST API, webhook, and custom script actions
- Event Queue Management: Reliable event queuing and processing with retry mechanisms
- Script Execution: Custom Lua script execution for complex event handling logic
- REST API Integration: HTTP/HTTPS REST API calls for external system integration
- Disk Storage: Persistent event logging and data storage capabilities
- Event Filtering: Configurable event filtering and routing based on event properties
- Asynchronous Processing: Non-blocking event processing for high-performance operation
- Error Handling: Robust error handling and recovery mechanisms
- Event Batching: Support for batching multiple events for efficient processing
- Configuration Reloading: Dynamic configuration updates without system restart
Requirements¶
Hardware Requirements¶
- CPU: Multi-core processor for concurrent event processing
- Memory: Minimum 1GB RAM (2GB+ recommended for high-volume event processing)
- Storage: Sufficient disk space for event logs and queued events
- Network: Network connectivity for REST API and webhook actions
Software Dependencies¶
- RTCORE: CVEDIA-RT core library for plugin infrastructure
- HTTP Client Library: HTTP/HTTPS client for REST API and webhook calls
- JSON Library: JSON parsing and generation for event data
- File System Libraries: File I/O operations for disk storage actions
- Threading Library: Multi-threading support for concurrent event processing
- Lua Runtime: Lua scripting engine for custom event handlers
Network Requirements¶
- HTTP/HTTPS Access: Outbound network access for REST API and webhook actions
- DNS Resolution: DNS resolution for external service endpoints
- Firewall Configuration: Appropriate firewall rules for outbound connections
- SSL/TLS Support: SSL/TLS support for secure HTTPS connections
Configuration¶
Basic Configuration¶
{
"eventaction": {
"actions": [
{
"type": "disk",
"uri": "/var/log/cvedia/events",
"enabled": true
},
{
"type": "rest",
"uri": "https://api.example.com/events",
"enabled": true
}
],
"queue_size": 1000,
"retry_attempts": 3,
"timeout": 30
}
}
Advanced Configuration¶
{
"eventaction": {
"actions": [
{
"type": "disk",
"uri": "/var/log/cvedia/events",
"enabled": true,
"parameters": {
"log_format": "json",
"rotate_size": "100MB",
"max_files": 10,
"compress": true
}
},
{
"type": "rest",
"uri": "https://api.example.com/events",
"enabled": true,
"parameters": {
"method": "POST",
"content_type": "application/json",
"timeout": 30,
"retry_attempts": 3,
"headers": {
"Authorization": "Bearer ${API_TOKEN}",
"X-Client-ID": "cvedia-rt"
}
}
},
{
"type": "webhook",
"uri": "https://hooks.example.com/cvedia",
"enabled": true,
"parameters": {
"secret": "${WEBHOOK_SECRET}",
"signature_header": "X-Hub-Signature",
"event_filters": ["motion_detected", "object_detected"]
}
},
{
"type": "script",
"uri": "/etc/cvedia/scripts/custom_handler.lua",
"enabled": true,
"parameters": {
"script_timeout": 60,
"max_memory": "50MB"
}
}
],
"processing": {
"queue_size": 5000,
"batch_size": 10,
"batch_timeout": 1000,
"worker_threads": 4,
"retry_attempts": 3,
"retry_delay": 1000
},
"filtering": {
"event_types": ["motion_detected", "object_detected", "zone_violation"],
"min_confidence": 0.7,
"zone_filters": ["entrance", "restricted_area"]
}
}
}
Configuration Schema¶
Parameter | Type | Default | Description |
---|---|---|---|
actions |
array | [] | List of event action configurations |
actions[].type |
string | required | Action type ("disk", "rest", "webhook", "script") |
actions[].uri |
string | required | Action URI or endpoint |
actions[].enabled |
bool | true | Enable/disable this action |
actions[].parameters |
object | {} | Action-specific parameters |
queue_size |
int | 1000 | Maximum number of queued events |
batch_size |
int | 1 | Number of events to batch together |
batch_timeout |
int | 1000 | Timeout for batching events (ms) |
worker_threads |
int | 2 | Number of worker threads for processing |
retry_attempts |
int | 3 | Number of retry attempts for failed actions |
retry_delay |
int | 1000 | Delay between retry attempts (ms) |
timeout |
int | 30 | Default timeout for actions (seconds) |
API Reference¶
C++ API (EventActionManaged)¶
Core Event Processing¶
class EventActionManaged : public ModuleImpl {
public:
// Configuration and initialization
expected<void> loadConfig(pCValue config) override;
expected<pCValue> getConfigDescriptors() override;
// Event processing
expected<void> process(pCValue eventObj);
expected<void> addEventToQueue(const std::string& eventId, pCValue eventData);
expected<cvec> getEventActions();
// Script processing
expected<pCValue> processScript(pCValue config, pCValue data);
// Queue management
expected<size_t> getQueueSize();
expected<void> clearQueue();
expected<pCValue> getQueueStats();
};
Action Configuration Structure¶
struct ActionConfig {
std::string type; // Action type ("disk", "rest", "webhook", "script")
std::string uri; // Action URI or endpoint
bool enabled = true; // Enable/disable action
pCValue parameters; // Action-specific parameters
int timeout = 30; // Action timeout (seconds)
int retry_attempts = 3; // Number of retry attempts
};
struct EventActionConfig {
std::vector<ActionConfig> actions; // List of configured actions
size_t queue_size = 1000; // Event queue size
size_t batch_size = 1; // Event batching size
int batch_timeout = 1000; // Batching timeout (ms)
int worker_threads = 2; // Number of worker threads
int retry_attempts = 3; // Default retry attempts
int retry_delay = 1000; // Delay between retries (ms)
};
Event Data Structure¶
struct EventData {
std::string event_id; // Unique event identifier
std::string event_type; // Type of event ("motion_detected", etc.)
double timestamp; // Event timestamp
pCValue metadata; // Event metadata and properties
std::string source_id; // Source identifier (camera, sensor, etc.)
float confidence; // Event confidence score
pCValue location_data; // Spatial location information
};
Action-Specific APIs¶
Disk Storage Action¶
class EventActionDisk {
public:
struct DiskActionConfig {
std::string log_format = "json"; // Log format ("json", "csv", "text")
std::string rotate_size = "100MB"; // Log rotation size
int max_files = 10; // Maximum log files to keep
bool compress = true; // Compress rotated files
std::string filename_pattern = "events_%Y%m%d_%H%M%S.log";
};
expected<void> writeEvent(const EventData& event, const DiskActionConfig& config);
expected<void> rotateLogFiles();
};
REST API Action¶
class EventActionRest {
public:
struct RestActionConfig {
std::string method = "POST"; // HTTP method
std::string content_type = "application/json"; // Content type
std::map<std::string, std::string> headers; // HTTP headers
int timeout = 30; // Request timeout (seconds)
bool verify_ssl = true; // Verify SSL certificates
};
expected<void> sendEvent(const EventData& event, const RestActionConfig& config);
expected<void> sendBatchedEvents(const std::vector<EventData>& events, const RestActionConfig& config);
};
Webhook Action¶
class EventActionWebhook {
public:
struct WebhookConfig {
std::string secret; // Webhook secret for signature
std::string signature_header = "X-Hub-Signature"; // Signature header name
std::vector<std::string> event_filters; // Event type filters
std::string payload_template; // Custom payload template
};
expected<void> sendWebhook(const EventData& event, const WebhookConfig& config);
std::string generateSignature(const std::string& payload, const std::string& secret);
};
Lua API¶
Event Action Setup¶
-- Create event action instance
local eventAction = api.factory.eventaction.create(instance, "event_processor")
-- Configure event actions
eventAction:configure({
actions = {
{
type = "disk",
uri = "/var/log/events",
enabled = true,
parameters = {
log_format = "json",
rotate_size = "50MB",
max_files = 5
}
},
{
type = "rest",
uri = "https://api.monitoring.com/events",
enabled = true,
parameters = {
method = "POST",
timeout = 30,
headers = {
["Authorization"] = "Bearer " .. api.config.get("api_token"),
["Content-Type"] = "application/json"
}
}
}
},
queue_size = 2000,
worker_threads = 3,
retry_attempts = 5
})
-- Load configuration
local success = eventAction:loadConfig(eventAction.config)
if success then
api.logging.LogInfo("Event action system initialized successfully")
end
Event Processing¶
-- Process individual events
function processAnalyticsEvent(eventType, eventData)
local eventObj = {
event_id = "evt_" .. os.time() .. "_" .. math.random(1000, 9999),
event_type = eventType,
timestamp = os.time(),
source_id = instance.name,
metadata = eventData,
confidence = eventData.confidence or 1.0
}
-- Add to processing queue
eventAction:addEventToQueue(eventObj.event_id, eventObj)
api.logging.LogInfo(string.format("Queued event: %s (%s)", eventType, eventObj.event_id))
end
-- Process motion detection events
function handleMotionDetection(detections)
for _, detection in ipairs(detections) do
local eventData = {
detection_area = detection.area,
bounding_box = {
x = detection.x,
y = detection.y,
w = detection.w,
h = detection.h
},
confidence = detection.confidence,
zone = detection.zone or "unknown"
}
processAnalyticsEvent("motion_detected", eventData)
end
end
Custom Script Actions¶
-- Custom event handler script (saved as separate .lua file)
-- This script is executed for script-type actions
function handleEvent(eventData)
local eventType = eventData.event_type
local metadata = eventData.metadata
if eventType == "motion_detected" then
return handleMotionEvent(eventData)
elseif eventType == "object_detected" then
return handleObjectEvent(eventData)
elseif eventType == "zone_violation" then
return handleZoneViolation(eventData)
end
return {success = true, message = "Event processed"}
end
function handleMotionEvent(eventData)
local area = eventData.metadata.detection_area
local zone = eventData.metadata.zone
-- Custom logic for motion events
if area > 5000 and zone == "restricted" then
-- Trigger high-priority alert
api.alerts.send({
type = "security_alert",
priority = "high",
message = "Large motion detected in restricted zone",
timestamp = eventData.timestamp
})
return {success = true, message = "Security alert triggered"}
end
return {success = true, message = "Motion event logged"}
end
Examples¶
Basic Event Processing System¶
#include "eventactionmanaged.h"
#include "rtcore.h"
// Basic event processing implementation
class EventProcessingSystem {
public:
void initialize() {
// Create event action processor
eventAction_ = std::unique_ptr<EventActionManaged>(
static_cast<EventActionManaged*>(
EventActionManaged::create("event_processor").release()
)
);
// Configure basic actions
auto config = CValue::create();
auto actions = CValue::createArray();
// Disk storage action
auto diskAction = CValue::create();
diskAction->set("type", "disk");
diskAction->set("uri", "/var/log/cvedia/events");
diskAction->set("enabled", true);
actions->append(diskAction);
// REST API action
auto restAction = CValue::create();
restAction->set("type", "rest");
restAction->set("uri", "https://api.monitoring.com/events");
restAction->set("enabled", true);
auto restParams = CValue::create();
restParams->set("method", "POST");
restParams->set("timeout", 30);
restAction->set("parameters", restParams);
actions->append(restAction);
config->set("actions", actions);
config->set("queue_size", 1000);
config->set("worker_threads", 2);
config->set("retry_attempts", 3);
// Load configuration
auto result = eventAction_->loadConfig(config);
if (!result) {
LOGE << "Failed to load event action configuration: " << result.error().message();
return;
}
LOGI << "Event processing system initialized successfully";
}
void processEvent(const std::string& eventType, pCValue eventData) {
// Create event object
auto eventObj = CValue::create();
eventObj->set("event_id", generateEventId());
eventObj->set("event_type", eventType);
eventObj->set("timestamp", getCurrentTimestamp());
eventObj->set("source_id", "system");
eventObj->set("metadata", eventData);
// Process event
auto result = eventAction_->process(eventObj);
if (result) {
LOGI << "Event processed successfully: " << eventType;
} else {
LOGE << "Failed to process event: " << result.error().message();
}
}
void handleMotionDetection(const std::vector<MotionArea>& motionAreas) {
for (const auto& area : motionAreas) {
// Create motion event data
auto motionData = CValue::create();
motionData->set("area", area.area);
motionData->set("confidence", area.confidence);
motionData->set("x", area.boundingBox.x);
motionData->set("y", area.boundingBox.y);
motionData->set("w", area.boundingBox.width);
motionData->set("h", area.boundingBox.height);
// Process motion event
processEvent("motion_detected", motionData);
}
}
private:
std::unique_ptr<EventActionManaged> eventAction_;
std::string generateEventId() {
// Generate unique event ID
return "evt_" + std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count());
}
double getCurrentTimestamp() {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count() / 1000.0;
}
};
Advanced Multi-Action Event System¶
// Advanced event processing with multiple action types
class AdvancedEventSystem {
public:
void initializeAdvancedActions() {
// Initialize event action system
initializeEventActions();
// Setup custom event filters
setupEventFilters();
// Configure action-specific settings
configureActionSettings();
LOGI << "Advanced event system initialized";
}
void processComplexEvent(const ComplexEvent& event) {
// Determine event priority
EventPriority priority = determineEventPriority(event);
// Create enriched event data
auto enrichedData = enrichEventData(event);
// Route to appropriate actions based on priority
routeEventByPriority(event.type, enrichedData, priority);
}
private:
std::unique_ptr<EventActionManaged> eventAction_;
std::map<std::string, ActionConfig> actionConfigs_;
enum class EventPriority {
Low, Medium, High, Critical
};
void initializeEventActions() {
eventAction_ = std::unique_ptr<EventActionManaged>(
static_cast<EventActionManaged*>(
EventActionManaged::create("advanced_processor").release()
)
);
// Configure multiple action types
auto config = createAdvancedConfiguration();
eventAction_->loadConfig(config);
}
pCValue createAdvancedConfiguration() {
auto config = CValue::create();
auto actions = CValue::createArray();
// High-performance disk logging
auto diskAction = CValue::create();
diskAction->set("type", "disk");
diskAction->set("uri", "/var/log/cvedia/events");
diskAction->set("enabled", true);
auto diskParams = CValue::create();
diskParams->set("log_format", "json");
diskParams->set("rotate_size", "500MB");
diskParams->set("max_files", 20);
diskParams->set("compress", true);
diskAction->set("parameters", diskParams);
actions->append(diskAction);
// Multiple REST endpoints
auto primaryRest = createRestAction("https://primary.api.com/events", "primary");
auto backupRest = createRestAction("https://backup.api.com/events", "backup");
actions->append(primaryRest);
actions->append(backupRest);
// Webhook for real-time notifications
auto webhook = CValue::create();
webhook->set("type", "webhook");
webhook->set("uri", "https://hooks.alerting.com/cvedia");
webhook->set("enabled", true);
auto webhookParams = CValue::create();
webhookParams->set("secret", getWebhookSecret());
webhookParams->set("signature_header", "X-Cvedia-Signature");
auto eventFilters = CValue::createArray();
eventFilters->append(CValue::create("motion_detected"));
eventFilters->append(CValue::create("zone_violation"));
webhookParams->set("event_filters", eventFilters);
webhook->set("parameters", webhookParams);
actions->append(webhook);
// Custom script for complex processing
auto scriptAction = CValue::create();
scriptAction->set("type", "script");
scriptAction->set("uri", "/etc/cvedia/scripts/advanced_handler.lua");
scriptAction->set("enabled", true);
auto scriptParams = CValue::create();
scriptParams->set("script_timeout", 120);
scriptParams->set("max_memory", "100MB");
scriptAction->set("parameters", scriptParams);
actions->append(scriptAction);
config->set("actions", actions);
// Advanced processing configuration
auto processing = CValue::create();
processing->set("queue_size", 10000);
processing->set("batch_size", 20);
processing->set("batch_timeout", 2000);
processing->set("worker_threads", 6);
processing->set("retry_attempts", 5);
processing->set("retry_delay", 2000);
config->set("processing", processing);
return config;
}
void routeEventByPriority(const std::string& eventType, pCValue eventData, EventPriority priority) {
switch (priority) {
case EventPriority::Critical:
// Send to all endpoints immediately
processCriticalEvent(eventType, eventData);
break;
case EventPriority::High:
// Send to primary endpoints with high priority
processHighPriorityEvent(eventType, eventData);
break;
case EventPriority::Medium:
// Standard processing
eventAction_->process(eventData);
break;
case EventPriority::Low:
// Queue for batch processing
eventAction_->addEventToQueue(eventData->get("event_id").getString(), eventData);
break;
}
}
void processCriticalEvent(const std::string& eventType, pCValue eventData) {
// Bypass queue for critical events
eventAction_->process(eventData);
// Send immediate notifications
sendImmediateNotification(eventType, eventData);
// Log critical event
LOGW << "CRITICAL EVENT: " << eventType << " processed immediately";
}
};
Complete Event-Driven Security System¶
-- Complete event-driven security system
local eventAction = api.factory.eventaction.create(instance, "security_events")
local motion = api.factory.motion.create(instance, "security_motion")
local alerts = api.factory.alerts.create(instance, "security_alerts")
-- Security event configuration
local securityConfig = {
event_retention_days = 30,
high_priority_zones = {"entrance", "restricted", "perimeter"},
notification_endpoints = {
primary = "https://security.company.com/api/events",
backup = "https://backup.security.com/api/events",
sms = "https://sms.provider.com/api/send"
},
escalation_rules = {
motion_threshold = 3, -- events in 5 minutes
motion_timeout = 300, -- 5 minutes
zone_violation_immediate = true
}
}
-- Event statistics tracking
local eventStats = {
total_events = 0,
motion_events = 0,
zone_violations = 0,
alerts_sent = 0,
failed_actions = 0
}
-- Initialize security event system
function initializeSecurityEventSystem()
api.logging.LogInfo("Initializing security event processing system")
-- Configure event actions
eventAction:configure({
actions = {
-- Secure disk logging
{
type = "disk",
uri = "/var/log/security/events",
enabled = true,
parameters = {
log_format = "json",
rotate_size = "100MB",
max_files = 50, -- 30 days retention
compress = true,
filename_pattern = "security_%Y%m%d_%H.log"
}
},
-- Primary security API
{
type = "rest",
uri = securityConfig.notification_endpoints.primary,
enabled = true,
parameters = {
method = "POST",
timeout = 15,
retry_attempts = 5,
headers = {
["Authorization"] = "Bearer " .. api.config.get("security_api_token"),
["Content-Type"] = "application/json",
["X-System-ID"] = "cvedia-rt-security"
}
}
},
-- Backup security API
{
type = "rest",
uri = securityConfig.notification_endpoints.backup,
enabled = true,
parameters = {
method = "POST",
timeout = 30,
retry_attempts = 3
}
},
-- Real-time webhook for immediate alerts
{
type = "webhook",
uri = "https://hooks.security.com/cvedia-alerts",
enabled = true,
parameters = {
secret = api.config.get("webhook_secret"),
signature_header = "X-Security-Signature",
event_filters = {"zone_violation", "multiple_motion_events"}
}
},
-- Custom security script
{
type = "script",
uri = "/etc/cvedia/security/event_handler.lua",
enabled = true,
parameters = {
script_timeout = 60,
max_memory = "50MB"
}
}
},
processing = {
queue_size = 5000,
batch_size = 5,
batch_timeout = 1000,
worker_threads = 4,
retry_attempts = 5,
retry_delay = 2000
},
filtering = {
event_types = {"motion_detected", "zone_violation", "tampering_detected"},
min_confidence = 0.6,
zone_filters = securityConfig.high_priority_zones
}
})
-- Load event action configuration
local success = eventAction:loadConfig(eventAction.config)
if not success then
api.logging.LogError("Failed to initialize security event system")
return false
end
-- Setup event monitoring
setupEventMonitoring()
-- Start event statistics tracking
startEventStatisticsTracking()
api.logging.LogInfo("Security event system initialized successfully")
return true
end
-- Process security events from motion detection
function processSecurityMotionEvent(detection)
eventStats.total_events = eventStats.total_events + 1
eventStats.motion_events = eventStats.motion_events + 1
local currentTime = os.time()
-- Create comprehensive event data
local eventData = {
event_id = api.system.generateUUID(),
event_type = "motion_detected",
timestamp = currentTime,
source_id = instance.name,
confidence = detection.confidence,
metadata = {
detection_area = detection.area,
bounding_box = {
x = detection.x,
y = detection.y,
w = detection.w,
h = detection.h
},
zone = determineSecurityZone(detection),
camera_location = api.config.get("camera_location", "unknown"),
system_info = {
version = "1.0.0", -- Replace with actual version retrieval
uptime = os.time() -- Replace with actual uptime if available
}
}
}
-- Determine event priority
local priority = determineEventPriority(eventData)
-- Process based on priority
if priority == "critical" then
processCriticalSecurityEvent(eventData)
elseif priority == "high" then
processHighPriorityEvent(eventData)
else
processStandardEvent(eventData)
end
end
-- Process critical security events
function processCriticalSecurityEvent(eventData)
api.logging.LogWarning("CRITICAL SECURITY EVENT: " .. (eventData.event_type or "unknown"))
-- Immediate processing (bypass queue)
eventAction:process(eventData)
-- Send immediate SMS alert
sendImmediateSMSAlert(eventData)
-- Note: Emergency recording would require implementation
-- Actual implementation depends on storage plugin availability
api.logging.LogInfo("Emergency recording requested for event: " .. eventData.event_id)
eventStats.alerts_sent = eventStats.alerts_sent + 1
end
-- Determine event priority based on various factors
function determineEventPriority(eventData)
local zone = eventData.metadata.zone
local area = eventData.metadata.detection_area
local confidence = eventData.confidence
-- Critical: High confidence in restricted zones
if confidence > 0.9 and isHighSecurityZone(zone) then
return "critical"
end
-- High: Large movement in monitored areas
if area > 10000 and confidence > 0.7 then
return "high"
end
-- Check for multiple events (escalation)
if checkEventEscalation(eventData) then
return "high"
end
return "standard"
end
-- Check for event escalation patterns
function checkEventEscalation(eventData)
local currentTime = eventData.timestamp
local recentEvents = getRecentEvents(currentTime - securityConfig.escalation_rules.motion_timeout)
-- Count recent motion events
local motionCount = 0
for _, event in ipairs(recentEvents) do
if event.event_type == "motion_detected" then
motionCount = motionCount + 1
end
end
-- Escalate if too many motion events
if motionCount >= securityConfig.escalation_rules.motion_threshold then
api.logging.LogWarning("Event escalation triggered: " .. motionCount .. " motion events in " .. securityConfig.escalation_rules.motion_timeout .. " seconds")
-- Create escalation event
local escalationEvent = {
event_id = "evt_" .. os.time() .. "_" .. math.random(1000, 9999),
event_type = "multiple_motion_events",
timestamp = currentTime,
source_id = instance.name,
metadata = {
escalation_reason = "multiple_motion_threshold",
event_count = motionCount,
time_window = securityConfig.escalation_rules.motion_timeout,
related_events = extractEventIds(recentEvents)
}
}
eventAction:process(escalationEvent)
return true
end
return false
end
-- Send immediate SMS alert for critical events
function sendImmediateSMSAlert(eventData)
local smsData = {
to = api.config.get("security_phone"),
message = string.format(
"SECURITY ALERT: %s detected at %s. Zone: %s. Confidence: %.2f",
eventData.event_type,
os.date("%H:%M:%S", eventData.timestamp),
eventData.metadata.zone,
eventData.confidence
),
priority = "urgent"
}
api.http.post(securityConfig.notification_endpoints.sms, smsData, {
timeout = 10,
headers = {
["Authorization"] = "Bearer " .. api.config.get("sms_api_token")
}
})
end
-- Monitor event processing health
function setupEventMonitoring()
-- Implement periodic health checks in main loop
-- Check every 60 seconds
local function performHealthCheck()
local queueStats = eventAction:getQueueStats()
if queueStats then
local queueSize = queueStats.current_size or 0
local maxSize = queueStats.max_size or 1000
if queueSize > maxSize * 0.8 then
api.logging.LogWarning("Event queue nearly full: " .. queueSize .. "/" .. maxSize .. " events")
end
-- Log processing statistics
api.logging.LogDebug("Event Processing: Queue: " .. queueSize .. ", Processed: " .. (queueStats.processed_count or 0) .. ", Failed: " .. (queueStats.failed_count or 0))
end
end)
end
-- Track event statistics
function startEventStatisticsTracking()
-- Implement periodic statistics logging in main loop
-- Log every 5 minutes (300 seconds)
local function logStatistics()
api.logging.LogInfo("Security Event Statistics:")
api.logging.LogInfo(" Total Events: " .. (eventStats.total_events or 0))
api.logging.LogInfo(" Motion Events: " .. (eventStats.motion_events or 0))
api.logging.LogInfo(" Zone Violations: " .. (eventStats.zone_violations or 0))
api.logging.LogInfo(" Alerts Sent: " .. (eventStats.alerts_sent or 0))
api.logging.LogInfo(" Failed Actions: " .. (eventStats.failed_actions or 0))
-- Reset counters periodically for rate calculation
if eventStats.total_events > 10000 then
eventStats = {
total_events = 0,
motion_events = 0,
zone_violations = 0,
alerts_sent = 0,
failed_actions = 0
}
end
end)
end
-- Initialize the security event system
initializeSecurityEventSystem()
-- info("Security event processing system is active")
Best Practices¶
Performance Optimization¶
- Queue Management: Configure appropriate queue sizes based on expected event volume
- Batch Processing: Use event batching for high-volume scenarios to improve throughput
- Worker Threads: Scale worker threads based on available CPU cores and I/O requirements
- Timeout Configuration: Set reasonable timeouts to prevent hanging operations
Reliability Guidelines¶
- Retry Logic: Implement exponential backoff for retry attempts
- Error Handling: Handle network failures and service unavailability gracefully
- Monitoring: Monitor queue sizes and processing rates for system health
- Failover: Configure backup endpoints for critical event delivery
Security Considerations¶
- Authentication: Use secure authentication methods for REST API and webhook endpoints
- Encryption: Ensure HTTPS/TLS for all network communications
- Secret Management: Store API keys and secrets securely
- Event Filtering: Implement proper event filtering to avoid information leakage
Integration Guidelines¶
- Event Schema: Use consistent event schemas across all actions
- Idempotency: Ensure event processing is idempotent for retry scenarios
- Logging: Implement comprehensive logging for audit and debugging
- Configuration Management: Use environment variables for sensitive configuration
Troubleshooting¶
Common Issues¶
High Queue Usage¶
// Monitor and adjust queue configuration
auto queueStats = eventAction->getQueueStats();
if (queueStats && queueStats->get("utilization").getFloat() > 0.8f) {
// Increase queue size or worker threads
auto config = eventAction->getConfig();
config->set("queue_size", 5000);
config->set("worker_threads", 4);
eventAction->loadConfig(config);
}
Failed REST API Calls¶
- Network Connectivity: Verify network access to endpoints
- Authentication: Check API credentials and token expiration
- Rate Limiting: Implement proper rate limiting and retry logic
- SSL/TLS Issues: Verify certificate validity and SSL configuration
Script Execution Failures¶
- Script Syntax: Validate Lua script syntax and dependencies
- Memory Limits: Monitor script memory usage and adjust limits
- Timeout Issues: Increase script timeout for complex processing
- Permission Issues: Ensure script files have proper read permissions
Disk Storage Issues¶
- Disk Space: Monitor available disk space for log storage
- File Permissions: Verify write permissions for log directories
- Log Rotation: Configure proper log rotation to prevent disk full
- I/O Performance: Monitor disk I/O performance under high load
Debugging Tools¶
// Event action diagnostics
void diagnoseEventActions(EventActionManaged* eventAction) {
// Get queue statistics
auto queueStats = eventAction->getQueueStats();
if (queueStats) {
LOGI << "Queue Statistics:";
LOGI << " Current Size: " << queueStats->get("current_size").getInt();
LOGI << " Max Size: " << queueStats->get("max_size").getInt();
LOGI << " Processed Count: " << queueStats->get("processed_count").getInt();
LOGI << " Failed Count: " << queueStats->get("failed_count").getInt();
}
// Get action configurations
auto actions = eventAction->getEventActions();
if (actions) {
LOGI << "Configured Actions:";
for (const auto& action : actions.value()) {
auto type = action->get("type").getString();
auto uri = action->get("uri").getString();
auto enabled = action->get("enabled").getBool();
LOGI << " Action: " << type << " -> " << uri << " (enabled: " << enabled << ")";
}
}
}
Integration Examples¶
Video Management System Integration¶
// Complete VMS integration with event actions
class VMSEventIntegration {
public:
void initialize() {
// Initialize event processing
initializeEventActions();
// Setup VMS-specific actions
setupVMSActions();
// Configure event routing
setupEventRouting();
}
void processVMSEvent(const VMSEvent& event) {
// Convert VMS event to standard format
auto eventData = convertVMSEvent(event);
// Route based on event type and priority
routeVMSEvent(eventData);
// Update VMS database
updateVMSDatabase(event);
}
private:
void setupVMSActions() {
// Configure VMS-specific REST endpoints
// Configure database logging
// Setup alert escalation
}
pCValue convertVMSEvent(const VMSEvent& event) {
auto eventData = CValue::create();
eventData->set("event_id", event.id);
eventData->set("event_type", event.type);
eventData->set("timestamp", event.timestamp);
eventData->set("camera_id", event.cameraId);
eventData->set("metadata", event.metadata);
return eventData;
}
};
See Also¶
- Motion Plugin - Motion detection event generation
- Processing Plugins Overview - All processing plugins
- Plugin Overview - Complete plugin ecosystem
- Output Plugins - Output plugin integration
- Event System Guide - Event system architecture patterns