Skip to content

WriteData Plugin

Description

The WriteData plugin provides comprehensive file output capabilities for CVEDIA-RT through the Output plugin system. It registers URI handlers for multiple file formats and serves as the backend for file-based data export. The plugin supports various file formats including video files (MP4, MPG, MPEG), data formats (JSON, TXT, CSV), and image formats (JPG, PNG).

Key Features

  • Multiple File Formats: Video (MP4, MPG, MPEG), data (JSON, TXT, CSV), images (JPG, PNG)
  • URI-based Output: Integrated with Output plugin delegation system
  • Structured Data Export: JSON, CSV with proper formatting
  • File System Management: Directory creation, file rotation, cleanup
  • Dynamic Path Support: Template-based file naming with variables
  • Event-driven Export: Real-time data writing from analytics pipeline
  • Batch Processing: Efficient handling of bulk data exports
  • Metadata Preservation: Timestamp and context information retention

When to Use

  • Recording video files from analytics output
  • Exporting structured event data (JSON, CSV)
  • Saving detection images and crops
  • Creating audit logs and reports
  • Data backup and archival
  • Integration with external systems via file exchange
  • Compliance and regulatory reporting

Requirements

Software Dependencies

  • File system write permissions
  • Sufficient disk space for data storage
  • Optional: Video encoding libraries (for MP4/MPG formats)
  • Optional: Image processing libraries (for JPG/PNG formats)

Storage Requirements

  • Varies by data volume and retention period
  • Video files: Significant space (GB scale)
  • Data files: Moderate space (MB scale)
  • Image files: Variable based on resolution and frequency
  • Recommend monitoring disk usage and implementing rotation policies

Configuration

The WriteData plugin is configured through the Output plugin system using URI schemes.

Basic Configuration

{
  "output": {
    "handlers": {
      "json-export": {
        "uri": "json:///data/analytics/events.json",
        "sink": "events",
        "enabled": true
      },
      "video-recording": {
        "uri": "mp4:///recordings/camera1.mp4",
        "sink": "output",
        "enabled": true
      },
      "image-export": {
        "uri": "jpg:///images/detections.jpg",
        "sink": "crops",
        "enabled": true
      }
    }
  }
}

Advanced Configuration with Dynamic Paths

{
  "output": {
    "handlers": {
      "timestamped-json": {
        "uri": "json:///data/{{date}}/events_{{time}}.json",
        "sink": "events",
        "enabled": true,
        "config": {
          "format": "pretty",
          "compression": true
        }
      },
      "daily-csv-log": {
        "uri": "csv:///logs/{{date}}/analytics.csv",
        "sink": "analytics",
        "enabled": true,
        "config": {
          "headers": true,
          "delimiter": ",",
          "append": true
        }
      }
    }
  }
}

Configuration Schema

Parameter Type Default Description
uri string required File URI with scheme (json://, mp4://, jpg://, etc.)
sink string "output" Data sink to connect to
enabled boolean true Enable/disable file output
config.format string "compact" Output format (pretty, compact for JSON)
config.compression boolean false Enable file compression
config.headers boolean false Include headers in CSV files
config.delimiter string "," CSV field delimiter
config.append boolean false Append to existing files

URI Schemes

The WriteData plugin registers the following URI schemes with the Output plugin:

Video Formats

  • mp4://path - MP4 video files
  • mpg://path - MPEG video files
  • mpeg://path - MPEG video files (alternative)

Data Formats

  • json://path - JSON structured data
  • txt://path - Plain text files
  • csv://path - Comma-separated values

Image Formats

  • jpg://path - JPEG image files
  • png://path - PNG image files

URI Path Examples

json:///data/analytics/events.json          # Absolute path
json://./relative/path/data.json            # Relative path
json:///logs/{{date}}/events_{{time}}.json  # Dynamic path with variables
mp4:///recordings/camera_{{id}}.mp4         # Video with dynamic naming
csv:///exports/daily_{{yyyy-mm-dd}}.csv     # CSV with date formatting

API Reference

C++ API

The WriteData plugin implements the iface::OutputHandler interface:

class WriteDataOutputHandler : public iface::OutputHandler {
public:
    // Constructor
    WriteDataOutputHandler(const std::string& moduleName,
                          const std::string& schema, 
                          const std::string& sink,
                          const std::string& path, 
                          pCValue config);

    // Factory method (registered with Output plugin)
    static std::shared_ptr<iface::OutputHandler> create(
        const std::string& moduleName,
        const std::string& schema,
        const std::string& sink, 
        const std::string& path,
        pCValue config);

    // File operations
    expected<bool> write(pCValue sinkData = VAL(), 
                        std::string dynamicPath = "") override;
    void stop() override;
    void close() override;

    // Configuration
    std::string getSink() override;
    void setFormat(const std::string& format);
    void setCompression(bool enabled);
};

Core WriteData Classes

class WriteDataImpl : public iface::WriteData {
public:
    // File operations
    bool writeData(const std::string& filename, pCValue data);
    bool appendData(const std::string& filename, pCValue data);
    bool writeStructuredData(const std::string& filename, pCValue data);
    bool writeImageData(const std::string& filename, cbuffer imageData);
    bool writeVideoFrame(const std::string& filename, cbuffer frameData);

    // Configuration
    void setOutputPath(const std::string& path);
    void setFormat(const std::string& format);
    void setCompression(bool enabled);

    // File management
    bool createDirectory(const std::string& path);
    bool rotateFile(const std::string& filename, size_t maxSize);
    std::vector<std::string> listFiles(const std::string& pattern);
};

Lua API

WriteData is accessed through the Output plugin system:

-- WriteData outputs are created through the Output plugin
local output = api.factory.output.create(instance, "FileExporter")

-- Add JSON data export handler
local jsonHandler = output:addHandler(
    "json-export",
    "json:///data/analytics/events.json",
    "events",
    {
        format = "pretty",
        compression = false
    }
)

-- Add video recording handler
local videoHandler = output:addHandler(
    "video-recording",
    "mp4:///recordings/output.mp4",
    "output",
    {
        quality = "high",
        fps = 30
    }
)

-- Add image export handler
local imageHandler = output:addHandler(
    "image-export", 
    "jpg:///images/crops.jpg",
    "crops",
    {
        quality = 85,
        compression = true
    }
)

Note: WriteData operates as output handlers registered with URI schemes through the Output plugin delegation system.

Examples

JSON Data Export

-- Create output instance for JSON export
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "JSONExporter")

-- Configure JSON data export with dynamic file naming
local jsonConfig = {
    format = "pretty",
    compression = false
}

local jsonHandler = output:addHandler(
    "event-export",
    "json:///data/analytics/events_{{timestamp}}.json",
    "events",
    jsonConfig
)

if jsonHandler then
    api.logging.LogInfo("JSON export configured: /data/analytics/events_[timestamp].json")

    -- The handler will automatically receive events from the "events" sink
    -- and export them as formatted JSON files
else
    api.logging.LogError("Failed to create JSON export handler")
end

Video Recording

local output = api.factory.output.create(instance, "VideoRecorder")

-- Configure MP4 video recording
local videoConfig = {
    fps = 25,
    quality = "medium",
    codec = "h264"
}

local timestamp = os.date("%Y%m%d_%H%M%S")
local videoHandler = output:addHandler(
    "video-recording",
    "mp4:///recordings/session_" .. timestamp .. ".mp4",
    "output",
    videoConfig
)

if videoHandler then
    api.logging.LogInfo("Video recording started: /recordings/session_" .. timestamp .. ".mp4")

    -- Function to stop recording
    function stopRecording()
        output:removeHandler("video-recording")
        api.logging.LogInfo("Video recording stopped")
    end
else
    api.logging.LogError("Failed to start video recording")
end

CSV Data Logging

local output = api.factory.output.create(instance, "CSVLogger")

-- Configure CSV export with headers
local csvConfig = {
    headers = true,
    delimiter = ",",
    append = true
}

local csvHandler = output:addHandler(
    "analytics-log",
    "csv:///logs/analytics_{{date}}.csv",
    "analytics",
    csvConfig
)

if csvHandler then
    api.logging.LogInfo("CSV logging configured: /logs/analytics_[date].csv")

    -- CSV files will automatically include headers and append new data
    -- Dynamic date variable ensures daily log files
else
    api.logging.LogError("Failed to create CSV logger")
end

Image Crop Export

local output = api.factory.output.create(instance, "CropExporter")

-- Configure JPEG crop export
local jpegConfig = {
    quality = 90,
    resize = {width = 224, height = 224},
    format = "RGB"
}

local cropHandler = output:addHandler(
    "detection-crops",
    "jpg:///crops/{{object_class}}/crop_{{detection_id}}.jpg",
    "crops",
    jpegConfig
)

if cropHandler then
    api.logging.LogInfo("Crop export configured: /crops/[class]/crop_[id].jpg")

    -- Images will be saved with dynamic paths based on detection data
    -- Each object class gets its own subdirectory
else
    api.logging.LogError("Failed to create crop exporter")
end

Multi-Format Export

local output = api.factory.output.create(instance, "MultiFormatExporter")

-- Export the same data to multiple formats
local formats = {
    {
        name = "json-backup",
        uri = "json:///backup/data_{{timestamp}}.json",
        sink = "events",
        config = {format = "compact", compression = true}
    },
    {
        name = "csv-analysis", 
        uri = "csv:///analysis/events_{{date}}.csv",
        sink = "events",
        config = {headers = true, delimiter = ";"}
    },
    {
        name = "txt-log",
        uri = "txt:///logs/system_{{timestamp}}.txt", 
        sink = "events",
        config = {append = true}
    }
}

-- Create all export handlers
local activeHandlers = {}
for _, format in ipairs(formats) do
    local handler = output:addHandler(format.name, format.uri, format.sink, format.config)
    if handler then
        table.insert(activeHandlers, format.name)
        api.logging.LogInfo("Created handler: " .. format.name)
    else
        api.logging.LogError("Failed to create handler: " .. format.name)
    end
end

-- Function to stop all exports
function stopAllExports()
    for _, name in ipairs(activeHandlers) do
        output:removeHandler(name)
        api.logging.LogInfo("Stopped: " .. name)
    end
    activeHandlers = {}
end

Directory Organization

local output = api.factory.output.create(instance, "OrganizedExporter")

-- Organize outputs by date and type
local organizationConfig = {
    events = "json:///data/{{yyyy}}/{{mm}}/{{dd}}/events_{{hh}}.json",
    analytics = "csv:///analytics/{{yyyy}}/monthly_{{mm}}.csv", 
    images = "jpg:///images/{{yyyy}}/{{mm}}/{{dd}}/{{object_class}}/image_{{id}}.jpg",
    videos = "mp4:///recordings/{{yyyy}}/{{mm}}/session_{{timestamp}}.mp4"
}

local handlers = {}
for dataType, uri in pairs(organizationConfig) do
    local config = {
        compression = (dataType == "events"), -- Compress JSON only
        headers = (dataType == "analytics"),  -- Headers for CSV only
        quality = (dataType == "images") and 85 or nil -- JPEG quality
    }

    local handler = output:addHandler(
        dataType .. "-export",
        uri,
        dataType,
        config
    )

    if handler then
        handlers[dataType] = handler
        api.logging.LogInfo("Configured " .. dataType .. " export with organized directory structure")
    end
end

Best Practices

File Organization

  1. Directory Structure:

    • Use date-based organization: /data/{{yyyy}}/{{mm}}/{{dd}}/
    • Separate by data type: /analytics/, /recordings/, /images/
    • Include metadata in paths: /{{camera_id}}/{{object_class}}/
  2. File Naming:

    • Include timestamps: events_{{timestamp}}.json
    • Use descriptive names: intrusion_detection_{{id}}.jpg
    • Avoid special characters and spaces
    • Include relevant metadata: camera_{{id}}_{{event_type}}.mp4
  3. Format Selection:

    • JSON for structured event data
    • CSV for tabular analytics data
    • MP4 for video recordings
    • JPG for images (good compression)
    • PNG for images (lossless quality)

Performance Optimization

  1. File I/O:

    • Use SSD storage for high-frequency writes
    • Implement file rotation for large datasets
    • Batch writes when possible
    • Monitor disk space usage
  2. Compression:

    • Enable compression for JSON files in production
    • Use appropriate JPEG quality settings (80-95)
    • Consider PNG for images requiring transparency
    • Compress video files with appropriate codecs
  3. Memory Management:

    • Flush buffers regularly for real-time data
    • Implement file size limits
    • Use appropriate buffer sizes
    • Monitor memory usage for large files

Security and Compliance

  1. File Permissions:

    • Set appropriate read/write permissions
    • Use dedicated service accounts
    • Implement access logging
    • Regular permission audits
  2. Data Retention:

    • Implement automatic cleanup policies
    • Archive old data appropriately
    • Comply with data retention regulations
    • Document retention policies

Troubleshooting

File System Issues

  1. "Permission denied"

    • Check directory write permissions: ls -la /path/to/directory
    • Verify user/group ownership: chown user:group /path
    • Ensure parent directories exist: mkdir -p /path/to/parent
    • Check SELinux/AppArmor policies if applicable
  2. "No such file or directory"

    • Verify path exists and is accessible
    • Check for typos in URI configuration
    • Ensure dynamic path variables are resolved correctly
    • Validate directory creation permissions
  3. "Disk full" errors

    • Monitor disk space: df -h
    • Implement file rotation policies
    • Configure cleanup scripts
    • Set up disk usage alerts

Format-Specific Issues

  1. JSON Export Problems

    • Validate JSON structure before writing
    • Check for circular references in data
    • Verify character encoding (UTF-8)
    • Test with JSON validators
  2. Video Recording Issues

    • Check video codec availability
    • Verify sufficient disk I/O performance
    • Monitor frame rate consistency
    • Validate video container format support
  3. Image Export Problems

    • Verify image format support libraries
    • Check image dimensions and bit depth
    • Validate color space conversions
    • Test compression settings

Performance Issues

  1. Slow write performance

    • Use faster storage (SSD vs HDD)
    • Optimize file buffer sizes
    • Reduce compression overhead
    • Monitor system I/O utilization
  2. High memory usage

    • Implement streaming writes for large files
    • Reduce buffer sizes if memory constrained
    • Monitor for memory leaks
    • Use appropriate data structures
  3. Network storage issues

    • Check network latency and bandwidth
    • Implement retry logic for network failures
    • Use local buffering for network storage
    • Monitor network connectivity

Integration Examples

External System Integration

# Monitor output directory for new files
inotifywait -m /data/analytics/ -e create -e moved_to |
while read path action file; do
    echo "New file: $file"
    # Process new analytics files
    python process_analytics.py "$path$file"
done

Log Analysis Scripts

import json
import csv
from pathlib import Path

def process_json_exports(directory):
    """Process JSON exports from WriteData plugin"""
    for json_file in Path(directory).glob("**/*.json"):
        with open(json_file, 'r') as f:
            data = json.load(f)
            # Process analytics data
            print(f"Processed {len(data.get('events', []))} events from {json_file}")

def convert_to_csv(json_directory, csv_output):
    """Convert JSON exports to consolidated CSV"""
    with open(csv_output, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['timestamp', 'event_type', 'confidence', 'object_class'])

        for json_file in Path(json_directory).glob("**/*.json"):
            with open(json_file, 'r') as f:
                data = json.load(f)
                for event in data.get('events', []):
                    writer.writerow([
                        event.get('timestamp'),
                        event.get('type'),
                        event.get('confidence'),
                        event.get('object_class')
                    ])

File Rotation Script

#!/bin/bash
# Rotate WriteData output files

ANALYTICS_DIR="/data/analytics"
MAX_SIZE="100M"
ARCHIVE_DIR="/archive/analytics"

find "$ANALYTICS_DIR" -name "*.json" -size +"$MAX_SIZE" | while read file; do
    echo "Rotating large file: $file"

    # Create archive directory
    mkdir -p "$ARCHIVE_DIR/$(date +%Y/%m)"

    # Compress and move file
    gzip "$file"
    mv "$file.gz" "$ARCHIVE_DIR/$(date +%Y/%m)/"

    echo "Archived: $file.gz"
done

See Also