Skip to content

LuaEngine Plugin

Description

LuaEngine is CVEDIA-RT's embedded Lua scripting system that provides dynamic scripting capabilities within the AI processing pipeline. Built on top of Sol2 (a modern C++ Lua binding library) and implementing the ScriptEngine interface, it enables custom logic implementation, workflow automation, and dynamic configuration through Lua scripts.

This plugin serves as the core scripting foundation for CVEDIA-RT, allowing users to create custom processing logic, implement complex decision trees, integrate with external systems, and automate workflows without requiring recompilation of the core system. It provides full access to CVEDIA-RT's C++ APIs through comprehensive Lua bindings.

Key Features

  • Full Lua 5.4 Environment: Complete Lua runtime with all standard libraries
  • Sol2 Integration: Modern C++ Lua bindings for seamless interoperability
  • Hot Reloading: Automatic script reload during development
  • Thread-Safe Execution: Protected concurrent script execution
  • Comprehensive API Access: Full access to CVEDIA-RT C++ APIs through Lua
  • Dynamic Script Loading: Load and execute scripts at runtime
  • Error Handling: Robust error management with detailed stack traces
  • Performance Monitoring: Tracy profiler integration for optimization
  • Plugin Integration: Seamless integration with other CVEDIA-RT plugins
  • Development Tools: Built-in debugging and development support

Use Cases

  • Custom Processing Logic: Implement domain-specific processing algorithms
  • Workflow Automation: Automate complex multi-step workflows
  • Event Handling: Create custom event processing and response logic
  • System Integration: Interface with external APIs and services
  • Rapid Prototyping: Quickly test and iterate on new ideas
  • Configuration Management: Dynamic configuration and parameter tuning
  • Business Rules: Implement complex business logic and decision trees
  • Data Transformation: Custom data processing and transformation pipelines

Requirements

Hardware Requirements

  • CPU: Multi-core processor recommended for concurrent script execution
  • Memory: Minimum 512MB RAM (2GB+ recommended for complex scripts)
  • Storage: Sufficient space for script files and temporary data

Software Dependencies

  • Lua 5.4: Embedded Lua runtime environment
  • Sol2: Modern C++ Lua binding library
  • RTCORE: CVEDIA-RT core library for plugin infrastructure
  • Threading Library: Multi-threading support for concurrent execution
  • File System Libraries: File I/O operations for script loading

Build Requirements

  • CMAKE: Build system with BUILD_LUA_LIB option enabled
  • C++17: Modern C++ standard for Sol2 compatibility
  • Tracy Profiler: Optional performance monitoring integration

Configuration

Basic Script Configuration

{
  "luaengine": {
    "script": "assets/scripts/my_script.lua",
    "auto_reload": true,
    "enable_debugging": true,
    "enable_profiling": false
  }
}

Advanced Configuration

{
  "luaengine": {
    "script": "/absolute/path/to/script.lua",
    "auto_reload": false,
    "enable_debugging": true,
    "enable_profiling": true,
    "lua_paths": [
      "./custom_modules/?.lua",
      "./shared/?.lua"
    ],
    "environment_variables": {
      "PROJECT_NAME": "MyProject",
      "API_ENDPOINT": "https://api.example.com"
    },
    "garbage_collection": {
      "auto_collect": true,
      "collection_interval": 1000
    }
  }
}

Configuration Schema

Parameter Type Default Description
script string "" Path to Lua script file (relative or absolute)
auto_reload bool false Enable automatic script reloading on file changes
enable_debugging bool true Enable detailed error reporting and stack traces
enable_profiling bool false Enable Tracy profiler integration
lua_paths array [] Additional Lua package search paths
environment_variables object {} Custom environment variables for script access
garbage_collection.auto_collect bool true Enable automatic garbage collection after execution
garbage_collection.collection_interval int 0 Manual collection interval in milliseconds (0 = disabled)

API Reference

C++ API (ScriptEngine Interface)

Core Script Execution

class LuaEngine : public ScriptEngine {
public:
    // Constructor with script path and auto-reload option
    LuaEngine(const std::string& script = "", bool autoReload = false);

    // Core execution methods
    std::vector<pCValue> callLogic(pCValue ctxData, std::string method) override;
    expected<std::vector<pCValue>> executeScript(
        vector<pCValue> const& stateData,
        string const& luaMethod,
        int numResults,
        string const& pluginName,
        string const& className,
        Module* self
    ) override;

    // String execution
    std::vector<pCValue> executeString(string const& luaCode);

    // Script management
    expected<void> loadScript(string const& scriptName);
    bool isScriptLoaded(string const scriptName) const;
    bool scriptLoadedOk() const;
    void clearLoadedFilesList();

    // Runtime control
    void setAutoReload(bool state);
    void collectGarbage();

    // Thread-safe Lua state access
    LuaState getLua(); // RAII wrapper for thread safety
};

Thread-Safe Lua State Access

// RAII wrapper for thread-safe Lua state access
class LuaState {
public:
    LuaState(std::shared_ptr<sol::state> state, std::mutex& mutex);
    ~LuaState();

    // Operators for Sol2 state access
    sol::state& operator*();
    sol::state* operator->();

    // Direct access (use with caution)
    sol::state& getState();
};

Factory Creation

// Create LuaEngine instance
auto scriptEngine = api::factory::ScriptEngine::create();

// Load and execute script
scriptEngine->loadScript("assets/scripts/my_processing_logic.lua");

// Execute specific function
auto results = scriptEngine->callLogic(inputData, "processDetections");

Lua API Access

CVEDIA-RT API Bindings

-- Access CVEDIA-RT APIs through the 'api' global
local factory = api.factory
local system = api.system
local filesystem = api.filesystem

-- Buffer management
local bufferMgr = api.bufferMgr

-- Plugin-specific APIs
local inference = api.factory.inference.create()
local tracker = api.factory.tracker.create()

Built-in Environment Variables

-- Environment variables automatically available
api.logging.LogInfo("Home directory: " .. homedir)
api.logging.LogInfo("Lua root: " .. luaroot)
api.logging.LogInfo("Project root: " .. project_root)

-- Custom environment variables (from configuration)
api.logging.LogInfo("API Endpoint: " .. (os.getenv("API_ENDPOINT") or "not set"))

Script Structure Template

-- Optional: Override auto-reload setting
auto_reload = true

-- Initialization function (called once)
function initialize()
    api.logging.LogInfo("Script initialized")
    -- Setup code here
end

-- Main processing function
function processFrame(frameData)
    -- Process frame data
    local detections = frameData.detections or {}

    -- Custom processing logic
    for i, detection in ipairs(detections) do
        -- Process each detection
        processDetection(detection)
    end

    -- Return processed data
    return frameData
end

-- Event handling function
function handleEvent(eventData)
    api.logging.LogInfo("Received event: " .. eventData.type)

    -- Custom event processing
    if eventData.type == "motion_detected" then
        handleMotionEvent(eventData)
    elseif eventData.type == "object_detected" then
        handleObjectEvent(eventData)
    end
end

-- Cleanup function (called on script unload)
function cleanup()
    api.logging.LogInfo("Script cleanup")
    -- Cleanup code here
end

Examples

Basic Detection Processing Script

-- Basic detection processing with custom logic
local factory = api.factory

-- Configuration
local config = {
    min_confidence = 0.7,
    max_detections = 10,
    excluded_classes = {"background", "unknown"}
}

-- Initialize script
function initialize()
    api.logging.LogInfo("Detection processing script initialized")
end

-- Main processing function
function processDetections(inputData)
    local detections = inputData.detections or {}
    local filteredDetections = {}

    api.logging.LogInfo("Processing " .. #detections .. " detections")

    -- Filter detections based on confidence and class
    for i, detection in ipairs(detections) do
        if isValidDetection(detection) then
            -- Add custom metadata
            detection.processed_timestamp = os.time()
            detection.script_version = "1.0.0"

            table.insert(filteredDetections, detection)

            -- Stop if we have enough detections
            if #filteredDetections >= config.max_detections then
                break
            end
        end
    end

    api.logging.LogInfo("Filtered to " .. #filteredDetections .. " valid detections")

    -- Return processed data
    local result = {
        detections = filteredDetections,
        processing_info = {
            original_count = #detections,
            filtered_count = #filteredDetections,
            script_name = "detection_processor"
        }
    }

    return result
end

-- Helper function to validate detection
function isValidDetection(detection)
    -- Check confidence threshold
    if detection.confidence < config.min_confidence then
        return false
    end

    -- Check if class is excluded
    for _, excluded in ipairs(config.excluded_classes) do
        if detection.class_name == excluded then
            return false
        end
    end

    -- Check bounding box validity
    if detection.x < 0 or detection.y < 0 or 
       detection.w <= 0 or detection.h <= 0 then
        return false
    end

    return true
end

Advanced Event Processing System

-- Advanced event processing with external API integration
local json = dofile(luaroot .. "/api/json.lua")

-- Event processing state
local eventState = {
    totalEvents = 0,
    eventsByType = {},
    lastEventTime = 0,
    alertCooldowns = {}
}

-- Configuration
local eventConfig = {
    api_endpoint = "https://api.example.com/events",
    alert_cooldown = 30, -- seconds
    batch_size = 10,
    retry_attempts = 3
}

-- Event batch for API submission
local eventBatch = {}

-- Initialize event processing
function initialize()
    api.logging.LogInfo("Advanced event processing system initialized")
    eventState.frameCounter = 0
    eventState.lastBatchProcess = os.time()
end

-- Main event handler
function handleEvent(eventData)
    local currentTime = os.time()

    -- Update statistics
    eventState.totalEvents = eventState.totalEvents + 1
    eventState.lastEventTime = currentTime

    -- Track events by type
    local eventType = eventData.type or "unknown"
    eventState.eventsByType[eventType] = (eventState.eventsByType[eventType] or 0) + 1

    api.logging.LogInfo("Processing event: " .. eventType .. " (total: " .. eventState.totalEvents .. ")")

    -- Process specific event types
    if eventType == "motion_detected" then
        handleMotionEvent(eventData, currentTime)
    elseif eventType == "object_detected" then
        handleObjectEvent(eventData, currentTime)
    elseif eventType == "zone_breach" then
        handleZoneBreachEvent(eventData, currentTime)
    elseif eventType == "system_alert" then
        handleSystemAlert(eventData, currentTime)
    else
        handleGenericEvent(eventData, currentTime)
    end

    -- Add to batch for API submission
    addEventToBatch(eventData, currentTime)
end

-- Handle motion detection events
function handleMotionEvent(eventData, timestamp)
    local motionInfo = eventData.motion_info or {}

    -- Check for significant motion
    if motionInfo.area and motionInfo.area > 5000 then
        local alertKey = "significant_motion"

        if shouldTriggerAlert(alertKey, timestamp) then
            triggerAlert({
                type = "significant_motion",
                message = "Large motion area detected",
                area = motionInfo.area,
                timestamp = timestamp,
                location = motionInfo.center
            })

            eventState.alertCooldowns[alertKey] = timestamp
        end
    end
end

-- Handle object detection events
function handleObjectEvent(eventData, timestamp)
    local detections = eventData.detections or {}

    -- Count high-confidence detections
    local highConfidenceCount = 0
    for _, detection in ipairs(detections) do
        if detection.confidence > 0.8 then
            highConfidenceCount = highConfidenceCount + 1
        end
    end

    -- Alert on multiple high-confidence detections
    if highConfidenceCount >= 3 then
        local alertKey = "multiple_objects"

        if shouldTriggerAlert(alertKey, timestamp) then
            triggerAlert({
                type = "multiple_objects",
                message = string.format("%d high-confidence objects detected", highConfidenceCount),
                count = highConfidenceCount,
                timestamp = timestamp
            })

            eventState.alertCooldowns[alertKey] = timestamp
        end
    end
end

-- Handle zone breach events
function handleZoneBreachEvent(eventData, timestamp)
    local zoneInfo = eventData.zone_info or {}
    local zoneName = zoneInfo.name or "Unknown Zone"

    api.logging.LogWarning(string.format("Zone breach detected: %s", zoneName))

    -- Always trigger zone breach alerts (high priority)
    triggerAlert({
        type = "zone_breach",
        message = string.format("Security zone breached: %s", zoneName),
        zone_name = zoneName,
        zone_id = zoneInfo.id,
        timestamp = timestamp,
        priority = "high"
    })
end

-- Handle system alerts
function handleSystemAlert(eventData, timestamp)
    local severity = eventData.severity or "info"
    local message = eventData.message or "System alert"

    api.logging.LogInfo(string.format("System alert [%s]: %s", severity, message))

    -- Forward critical system alerts immediately
    if severity == "critical" or severity == "error" then
        sendImmediateAlert({
            type = "system_critical",
            message = message,
            severity = severity,
            timestamp = timestamp
        })
    end
end

-- Check if alert should be triggered (considering cooldown)
function shouldTriggerAlert(alertKey, timestamp)
    local lastAlert = eventState.alertCooldowns[alertKey] or 0
    return (timestamp - lastAlert) >= eventConfig.alert_cooldown
end

-- Trigger alert (add to batch)
function triggerAlert(alertData)
    api.logging.LogWarning(string.format("ALERT: %s - %s", alertData.type, alertData.message))

    -- Add alert flag to event data
    alertData.is_alert = true
    alertData.alert_id = generateAlertId()

    -- Add to batch with high priority
    table.insert(eventBatch, 1, alertData) -- Insert at beginning for priority
end

-- Send immediate alert (bypass batch)
function sendImmediateAlert(alertData)
    alertData.immediate = true

    local success = sendEventToAPI(alertData)
    if success then
        api.logging.LogInfo("Immediate alert sent successfully")
    else
        api.logging.LogError("Failed to send immediate alert")
        -- Add to batch as fallback
        table.insert(eventBatch, alertData)
    end
end

-- Add event to batch for API submission
function addEventToBatch(eventData, timestamp)
    -- Add processing metadata
    eventData.processed_at = timestamp
    eventData.script_version = "2.0.0"

    table.insert(eventBatch, eventData)

    -- Process batch if it's full
    if #eventBatch >= eventConfig.batch_size then
        processBatchEvents()
    end
end

-- Process batched events
function processBatchEvents()
    if #eventBatch == 0 then
        return
    end

    api.logging.LogInfo(string.format("Processing batch of %d events", #eventBatch))

    -- Create batch payload
    local batchPayload = {
        events = eventBatch,
        batch_info = {
            size = #eventBatch,
            timestamp = os.time(),
            source = "cvedia-rt-luaengine"
        },
        statistics = {
            total_events = eventState.totalEvents,
            events_by_type = eventState.eventsByType
        }
    }

    -- Send to API with retry logic
    local success = sendBatchToAPI(batchPayload)

    if success then
        api.logging.LogInfo("Event batch sent successfully")
        eventBatch = {} -- Clear batch
    else
        api.logging.LogError("Failed to send event batch, will retry")
        -- Keep events in batch for retry
    end
end

-- Send batch to external API
function sendBatchToAPI(batchData)
    for attempt = 1, eventConfig.retry_attempts do
        api.logging.LogInfo(string.format("Sending batch to API (attempt %d/%d)", attempt, eventConfig.retry_attempts))

        local success = sendEventToAPI(batchData)
        if success then
            return true
        end

        -- Mark for retry in next processing cycle
        if attempt < eventConfig.retry_attempts then
            eventBatch.retryCount = (eventBatch.retryCount or 0) + 1
            return false
        end
    end

    return false
end

-- Send single event to API
function sendEventToAPI(eventData)
    local jsonData = json.encode(eventData)

    -- Make HTTP request (simplified example)
    local result, status = http.request{
        url = eventConfig.api_endpoint,
        method = "POST",
        headers = {
            ["Content-Type"] = "application/json",
            ["Content-Length"] = string.len(jsonData)
        },
        source = jsonData
    }

    return status == 200
end

-- Generate unique alert ID
function generateAlertId()
    local timestamp = os.time()
    local random = math.random(1000, 9999)
    return string.format("alert_%d_%d", timestamp, random)
end

-- Handle generic events
function handleGenericEvent(eventData, timestamp)
    -- Log unknown event types for monitoring
    api.logging.LogInfo(string.format("Generic event processed: %s", 
                json.encode(eventData)))
end

-- Cleanup function
function cleanup()
    -- Process any remaining events in batch
    if #eventBatch > 0 then
        api.logging.LogInfo("Processing remaining events before cleanup")
        processBatchEvents()
    end

    -- Log final statistics
    api.logging.LogInfo("Event Processing Statistics:")
    api.logging.LogInfo(string.format("  Total Events: %d", eventState.totalEvents))
    api.logging.LogInfo("  Events by Type:")
    for eventType, count in pairs(eventState.eventsByType) do
        api.logging.LogInfo(string.format("    %s: %d", eventType, count))
    end

    api.logging.LogInfo("Advanced event processing system cleanup complete")
end

Custom AI Pipeline Integration

-- Custom AI pipeline with multi-model inference
local factory = api.factory

-- AI Pipeline Components
local detectionModel = nil
local classificationModel = nil
local trackingSystem = nil

-- Pipeline configuration
local pipelineConfig = {
    detection_threshold = 0.6,
    classification_threshold = 0.7,
    tracking_enabled = true,
    max_age = 30, -- frames
    min_hits = 3
}

-- Performance metrics
local metrics = {
    frames_processed = 0,
    detections_total = 0,
    classifications_total = 0,
    tracking_active = 0,
    processing_times = {}
}

-- Initialize AI pipeline
function initialize()
    api.logging.LogInfo("Initializing custom AI pipeline")

    -- Create detection model
    detectionModel = factory.inference.create(instance, "object_detector")
    if detectionModel then
        detectionModel:configure({
            model_path = "models/yolo_v5.onnx",
            confidence_threshold = pipelineConfig.detection_threshold,
            nms_threshold = 0.4
        })
        api.logging.LogInfo("Detection model initialized")
    end

    -- Create classification model
    classificationModel = factory.inference.create(instance, "classifier")
    if classificationModel then
        classificationModel:configure({
            model_path = "models/resnet50_classifier.onnx",
            confidence_threshold = pipelineConfig.classification_threshold
        })
        api.logging.LogInfo("Classification model initialized")
    end

    -- Create tracking system
    if pipelineConfig.tracking_enabled then
        trackingSystem = factory.tracker.create(instance, "multi_tracker")
        if trackingSystem then
            trackingSystem:configure({
                max_age = pipelineConfig.max_age,
                min_hits = pipelineConfig.min_hits,
                iou_threshold = 0.3
            })
            api.logging.LogInfo("Tracking system initialized")
        end
    end

    api.logging.LogInfo("Custom AI pipeline initialization complete")
end

-- Main processing function
function processFrame(frameData)
    local startTime = os.clock() -- Use os.clock() for performance timing

    metrics.frames_processed = metrics.frames_processed + 1

    -- Step 1: Object Detection
    local detections = runObjectDetection(frameData)
    if not detections or #detections == 0 then
        return frameData -- No objects detected
    end

    metrics.detections_total = metrics.detections_total + #detections

    -- Step 2: Classification
    local classifiedDetections = runClassification(frameData, detections)
    metrics.classifications_total = metrics.classifications_total + #classifiedDetections

    -- Step 3: Tracking (if enabled)
    local trackedDetections = classifiedDetections
    if trackingSystem then
        trackedDetections = runTracking(frameData, classifiedDetections)
        metrics.tracking_active = #trackedDetections
    end

    -- Step 4: Post-processing
    local finalResults = postProcessResults(trackedDetections)

    -- Calculate processing time
    local endTime = os.clock()
    local processingTime = endTime - startTime
    table.insert(metrics.processing_times, processingTime)

    -- Keep only last 100 processing times
    if #metrics.processing_times > 100 then
        table.remove(metrics.processing_times, 1)
    end

    -- Log performance periodically
    if metrics.frames_processed % 100 == 0 then
        logPerformanceMetrics(processingTime)
    end

    -- Return processed frame data
    frameData.detections = finalResults
    frameData.pipeline_info = {
        processing_time_ms = processingTime,
        detection_count = #detections,
        classification_count = #classifiedDetections,
        tracking_count = #trackedDetections
    }

    return frameData
end

-- Run object detection
function runObjectDetection(frameData)
    if not detectionModel then
        return {}
    end

    local detections = detectionModel:process(frameData.buffer)

    -- Filter by confidence
    local filteredDetections = {}
    for _, detection in ipairs(detections) do
        if detection.confidence >= pipelineConfig.detection_threshold then
            table.insert(filteredDetections, detection)
        end
    end

    -- Debug: (string.format("Object detection: %d objects found (%d after filtering)",
                 #detections, #filteredDetections))

    return filteredDetections
end

-- Run classification on detections
function runClassification(frameData, detections)
    if not classificationModel then
        return detections
    end

    local classifiedDetections = {}

    for _, detection in ipairs(detections) do
        -- Extract crop for classification
        local crop = extractCrop(frameData.buffer, detection)
        if crop then
            local classification = classificationModel:process(crop)

            -- Add classification results to detection
            if classification and classification.confidence >= pipelineConfig.classification_threshold then
                detection.classification = {
                    class_name = classification.class_name,
                    confidence = classification.confidence,
                    top_5 = classification.top_5 or {}
                }

                table.insert(classifiedDetections, detection)
            end
        end
    end

    -- Debug: (string.format("Classification: %d objects classified from %d detections",
                 #classifiedDetections, #detections))

    return classifiedDetections
end

-- Run tracking on detections
function runTracking(frameData, detections)
    if not trackingSystem then
        return detections
    end

    local trackedDetections = trackingSystem:update(detections)

    -- Add tracking information
    for _, detection in ipairs(trackedDetections) do
        if detection.track_id then
            detection.tracking_info = {
                track_id = detection.track_id,
                age = detection.age or 0,
                hits = detection.hits or 0,
                velocity = detection.velocity or {x = 0, y = 0},
                trajectory = detection.trajectory or {}
            }
        end
    end

    -- Debug: (string.format("Tracking: %d objects tracked from %d detections",
                 #trackedDetections, #detections))

    return trackedDetections
end

-- Post-process results
function postProcessResults(detections)
    local finalResults = {}

    for _, detection in ipairs(detections) do
        -- Add custom metadata
        detection.pipeline_version = "1.2.0"
        detection.processing_timestamp = os.time()

        -- Calculate additional metrics
        detection.area = detection.w * detection.h
        detection.center = {
            x = detection.x + detection.w / 2,
            y = detection.y + detection.h / 2
        }

        -- Quality scoring
        detection.quality_score = calculateQualityScore(detection)

        table.insert(finalResults, detection)
    end

    -- Sort by confidence (highest first)
    table.sort(finalResults, function(a, b)
        return a.confidence > b.confidence
    end)

    return finalResults
end

-- Calculate quality score for detection
function calculateQualityScore(detection)
    local score = 0.0

    -- Base score from detection confidence
    score = score + (detection.confidence * 0.4)

    -- Bonus for classification
    if detection.classification then
        score = score + (detection.classification.confidence * 0.3)
    end

    -- Bonus for tracking stability
    if detection.tracking_info and detection.tracking_info.hits >= pipelineConfig.min_hits then
        score = score + 0.2
    end

    -- Size factor (prefer medium-sized objects)
    local area = detection.area or 0
    if area > 1000 and area < 50000 then
        score = score + 0.1
    end

    return math.min(1.0, score)
end

-- Extract crop from buffer for classification
function extractCrop(buffer, detection)
    -- This would use CVEDIA-RT's buffer management to extract crop
    -- Simplified example - actual implementation would use BufferMgr
    local crop = api.bufferMgr.extractRegion(buffer, {
        x = detection.x,
        y = detection.y,
        w = detection.w,
        h = detection.h
    })

    return crop
end

-- Log performance metrics
function logPerformanceMetrics(currentTime)
    local avgTime = 0
    if #metrics.processing_times > 0 then
        local sum = 0
        for _, time in ipairs(metrics.processing_times) do
            sum = sum + time
        end
        avgTime = sum / #metrics.processing_times
    end

    api.logging.LogInfo("AI Pipeline Performance Metrics:")
    api.logging.LogInfo(string.format("  Frames Processed: %d", metrics.frames_processed))
    api.logging.LogInfo(string.format("  Total Detections: %d", metrics.detections_total))
    api.logging.LogInfo(string.format("  Total Classifications: %d", metrics.classifications_total))
    api.logging.LogInfo(string.format("  Active Tracks: %d", metrics.tracking_active))
    api.logging.LogInfo(string.format("  Current Processing Time: %.2f ms", currentTime))
    api.logging.LogInfo(string.format("  Average Processing Time: %.2f ms", avgTime))

    local fps = 0
    if avgTime > 0 then
        fps = 1000 / avgTime
    end
    api.logging.LogInfo(string.format("  Estimated FPS: %.1f", fps))
end

-- Cleanup function
function cleanup()
    api.logging.LogInfo("Cleaning up custom AI pipeline")

    -- Log final metrics
    logPerformanceMetrics(0)

    -- Cleanup models and systems
    if detectionModel then
        detectionModel:cleanup()
    end

    if classificationModel then
        classificationModel:cleanup()
    end

    if trackingSystem then
        trackingSystem:cleanup()
    end

    api.logging.LogInfo("Custom AI pipeline cleanup complete")
end

Best Practices

Performance Optimization

  • Minimize Global State: Use local variables and avoid excessive global state
  • Efficient Data Structures: Use appropriate Lua data structures for your use case
  • Memory Management: Enable automatic garbage collection and monitor memory usage
  • Batch Operations: Process data in batches when possible to reduce overhead
  • Caching: Cache expensive computations and reuse results when appropriate

Error Handling

  • Protected Calls: Use pcall for risky operations that might fail
  • Comprehensive Logging: Log errors with sufficient context for debugging
  • Graceful Degradation: Handle failures gracefully and continue processing when possible
  • Input Validation: Validate input data before processing

Development Guidelines

  • Hot Reloading: Use auto_reload = true during development for faster iteration
  • Modular Design: Split complex scripts into multiple files and modules
  • Documentation: Comment your code thoroughly for maintainability
  • Testing: Create test scripts to validate your logic before deployment

Integration Patterns

  • Plugin Cooperation: Coordinate with other plugins through shared data structures
  • Event-Driven: Use event handlers for reactive programming patterns
  • Configuration Management: Use external configuration files for flexibility
  • API Integration: Leverage CVEDIA-RT's comprehensive API bindings

Troubleshooting

Common Issues

Script Not Loading

-- Check if script path is correct
function initialize()
    local scriptPath = debug.getinfo(1, "S").source:sub(2)
    api.logging.LogInfo("Script loaded from: " .. scriptPath)
end

Solutions: - Verify script file path (absolute or relative to RT home) - Check file permissions and accessibility - Ensure script syntax is valid Lua - Review error messages in CVEDIA-RT logs

Memory Issues

-- Monitor memory usage
function checkMemory()
    local memUsage = collectgarbage("count")
    api.logging.LogDebug(string.format("Lua memory usage: %.2f KB", memUsage))

    if memUsage > 10000 then -- 10MB threshold
        api.logging.LogWarning("High memory usage detected")
        collectgarbage("collect")
    end
end

Solutions: - Enable automatic garbage collection - Call collectgarbage() periodically in long-running scripts - Avoid memory leaks by clearing unused variables - Monitor memory usage and optimize data structures

API Access Issues

-- Verify API availability
function checkAPI()
    if not api then
        error("CVEDIA-RT API not available")
    end

    if not api.logger then
        error("Logger API not available")
    end

    api.logging.LogInfo("API access verified")
end

Solutions: - Ensure CVEDIA-RT is properly initialized - Check that required plugins are loaded - Verify plugin dependencies and loading order - Review API binding registration

Performance Issues

-- Performance monitoring
local startTime = os.clock() -- CPU time for performance measurement
-- Your processing code here
local endTime = os.clock()
local duration = endTime - startTime

if duration > 100 then -- 100ms threshold
    api.logging.LogWarning(string.format("Slow operation: %.2f ms", duration))
end

Solutions: - Profile your scripts using Tracy integration - Optimize data processing algorithms - Use batch processing for large datasets - Cache expensive computations - Consider moving intensive operations to C++ plugins

Debugging Tools

-- Debug helper functions
function debugPrint(data, label)
    label = label or "Debug"
    if type(data) == "table" then
        api.logging.LogDebug(string.format("%s: %s", label, tableToString(data)))
    else
        api.logging.LogDebug(string.format("%s: %s", label, tostring(data)))
    end
end

function tableToString(t)
    local result = {}
    for k, v in pairs(t) do
        if type(v) == "table" then
            table.insert(result, string.format("%s = {...}", tostring(k)))
        else
            table.insert(result, string.format("%s = %s", tostring(k), tostring(v)))
        end
    end
    return "{" .. table.concat(result, ", ") .. "}"
end

-- Stack trace helper
function printStackTrace()
    api.logging.LogDebug("Stack trace:")
    local level = 2
    while true do
        local info = debug.getinfo(level, "nSl")
        if not info then break end

        local name = info.name or "<anonymous>"
        local source = info.short_src or "<unknown>"
        local line = info.currentline or 0

        api.logging.LogDebug(string.format("  %s:%d in %s", source, line, name))
        level = level + 1
    end
end

Integration Examples

Plugin Integration

// C++ plugin integration with LuaEngine
class MyCustomPlugin : public Plugin {
public:
    void initialize() override {
        // Create LuaEngine instance
        scriptEngine_ = std::make_unique<LuaEngine>("scripts/custom_logic.lua", true);

        // Load plugin-specific bindings
        auto lua = scriptEngine_->getLua();
        lua->set_function("myPluginFunction", [this](int param) {
            return this->customFunction(param);
        });
    }

    void processData(pCValue data) override {
        // Execute Lua processing logic
        auto results = scriptEngine_->callLogic(data, "processData");

        // Handle results
        for (const auto& result : results) {
            handleProcessedData(result);
        }
    }

private:
    std::unique_ptr<LuaEngine> scriptEngine_;
};

Solution Integration

-- Integration with CVEDIA-RT solutions
local solutionAPI = api.solutions

-- Access solution-specific functionality
function initializeSolution()
    local solution = solutionAPI.getCurrentSolution()
    if solution then
        api.logging.LogInfo("Running in solution: " .. solution.name)

        -- Configure based on solution type
        if solution.type == "security" then
            setupSecurityLogic()
        elseif solution.type == "retail" then
            setupRetailLogic()
        end
    else
        api.logging.LogInfo("Running in standalone mode")
        setupStandaloneLogic()
    end
end

See Also