WriteData Plugin¶
Description¶
The WriteData plugin provides comprehensive file output capabilities for CVEDIA-RT through the Output plugin system. It registers URI handlers for multiple file formats and serves as the backend for file-based data export. The plugin supports various file formats including video files (MP4, MPG, MPEG), data formats (JSON, TXT, CSV), and image formats (JPG, PNG).
Key Features¶
- Multiple File Formats: Video (MP4, MPG, MPEG), data (JSON, TXT, CSV), images (JPG, PNG)
- URI-based Output: Integrated with Output plugin delegation system
- Structured Data Export: JSON, CSV with proper formatting
- File System Management: Directory creation, file rotation, cleanup
- Dynamic Path Support: Template-based file naming with variables
- Event-driven Export: Real-time data writing from analytics pipeline
- Batch Processing: Efficient handling of bulk data exports
- Metadata Preservation: Timestamp and context information retention
When to Use¶
- Recording video files from analytics output
- Exporting structured event data (JSON, CSV)
- Saving detection images and crops
- Creating audit logs and reports
- Data backup and archival
- Integration with external systems via file exchange
- Compliance and regulatory reporting
Requirements¶
Software Dependencies¶
- File system write permissions
- Sufficient disk space for data storage
- Optional: Video encoding libraries (for MP4/MPG formats)
- Optional: Image processing libraries (for JPG/PNG formats)
Storage Requirements¶
- Varies by data volume and retention period
- Video files: Significant space (GB scale)
- Data files: Moderate space (MB scale)
- Image files: Variable based on resolution and frequency
- Recommend monitoring disk usage and implementing rotation policies
Configuration¶
The WriteData plugin is configured through the Output plugin system using URI schemes.
Basic Configuration¶
{
"output": {
"handlers": {
"json-export": {
"uri": "json:///data/analytics/events.json",
"sink": "events",
"enabled": true
},
"video-recording": {
"uri": "mp4:///recordings/camera1.mp4",
"sink": "output",
"enabled": true
},
"image-export": {
"uri": "jpg:///images/detections.jpg",
"sink": "crops",
"enabled": true
}
}
}
}
Advanced Configuration with Dynamic Paths¶
{
"output": {
"handlers": {
"timestamped-json": {
"uri": "json:///data/{{date}}/events_{{time}}.json",
"sink": "events",
"enabled": true,
"config": {
"format": "pretty",
"compression": true
}
},
"daily-csv-log": {
"uri": "csv:///logs/{{date}}/analytics.csv",
"sink": "analytics",
"enabled": true,
"config": {
"headers": true,
"delimiter": ",",
"append": true
}
}
}
}
}
Configuration Schema¶
Parameter | Type | Default | Description |
---|---|---|---|
uri |
string | required | File URI with scheme (json://, mp4://, jpg://, etc.) |
sink |
string | "output" | Data sink to connect to |
enabled |
boolean | true | Enable/disable file output |
config.format |
string | "compact" | Output format (pretty, compact for JSON) |
config.compression |
boolean | false | Enable file compression |
config.headers |
boolean | false | Include headers in CSV files |
config.delimiter |
string | "," | CSV field delimiter |
config.append |
boolean | false | Append to existing files |
URI Schemes¶
The WriteData plugin registers the following URI schemes with the Output plugin:
Video Formats¶
mp4://path
- MP4 video filesmpg://path
- MPEG video filesmpeg://path
- MPEG video files (alternative)
Data Formats¶
json://path
- JSON structured datatxt://path
- Plain text filescsv://path
- Comma-separated values
Image Formats¶
jpg://path
- JPEG image filespng://path
- PNG image files
URI Path Examples¶
json:///data/analytics/events.json # Absolute path
json://./relative/path/data.json # Relative path
json:///logs/{{date}}/events_{{time}}.json # Dynamic path with variables
mp4:///recordings/camera_{{id}}.mp4 # Video with dynamic naming
csv:///exports/daily_{{yyyy-mm-dd}}.csv # CSV with date formatting
API Reference¶
C++ API¶
The WriteData plugin implements the iface::OutputHandler
interface:
class WriteDataOutputHandler : public iface::OutputHandler {
public:
// Constructor
WriteDataOutputHandler(const std::string& moduleName,
const std::string& schema,
const std::string& sink,
const std::string& path,
pCValue config);
// Factory method (registered with Output plugin)
static std::shared_ptr<iface::OutputHandler> create(
const std::string& moduleName,
const std::string& schema,
const std::string& sink,
const std::string& path,
pCValue config);
// File operations
expected<bool> write(pCValue sinkData = VAL(),
std::string dynamicPath = "") override;
void stop() override;
void close() override;
// Configuration
std::string getSink() override;
void setFormat(const std::string& format);
void setCompression(bool enabled);
};
Core WriteData Classes¶
class WriteDataImpl : public iface::WriteData {
public:
// File operations
bool writeData(const std::string& filename, pCValue data);
bool appendData(const std::string& filename, pCValue data);
bool writeStructuredData(const std::string& filename, pCValue data);
bool writeImageData(const std::string& filename, cbuffer imageData);
bool writeVideoFrame(const std::string& filename, cbuffer frameData);
// Configuration
void setOutputPath(const std::string& path);
void setFormat(const std::string& format);
void setCompression(bool enabled);
// File management
bool createDirectory(const std::string& path);
bool rotateFile(const std::string& filename, size_t maxSize);
std::vector<std::string> listFiles(const std::string& pattern);
};
Lua API¶
WriteData is accessed through the Output plugin system:
-- WriteData outputs are created through the Output plugin
local output = api.factory.output.create(instance, "FileExporter")
-- Add JSON data export handler
local jsonHandler = output:addHandler(
"json-export",
"json:///data/analytics/events.json",
"events",
{
format = "pretty",
compression = false
}
)
-- Add video recording handler
local videoHandler = output:addHandler(
"video-recording",
"mp4:///recordings/output.mp4",
"output",
{
quality = "high",
fps = 30
}
)
-- Add image export handler
local imageHandler = output:addHandler(
"image-export",
"jpg:///images/crops.jpg",
"crops",
{
quality = 85,
compression = true
}
)
Note: WriteData operates as output handlers registered with URI schemes through the Output plugin delegation system.
Examples¶
JSON Data Export¶
-- Create output instance for JSON export
local instance = api.thread.getCurrentInstance()
local output = api.factory.output.create(instance, "JSONExporter")
-- Configure JSON data export with dynamic file naming
local jsonConfig = {
format = "pretty",
compression = false
}
local jsonHandler = output:addHandler(
"event-export",
"json:///data/analytics/events_{{timestamp}}.json",
"events",
jsonConfig
)
if jsonHandler then
api.logging.LogInfo("JSON export configured: /data/analytics/events_[timestamp].json")
-- The handler will automatically receive events from the "events" sink
-- and export them as formatted JSON files
else
api.logging.LogError("Failed to create JSON export handler")
end
Video Recording¶
local output = api.factory.output.create(instance, "VideoRecorder")
-- Configure MP4 video recording
local videoConfig = {
fps = 25,
quality = "medium",
codec = "h264"
}
local timestamp = os.date("%Y%m%d_%H%M%S")
local videoHandler = output:addHandler(
"video-recording",
"mp4:///recordings/session_" .. timestamp .. ".mp4",
"output",
videoConfig
)
if videoHandler then
api.logging.LogInfo("Video recording started: /recordings/session_" .. timestamp .. ".mp4")
-- Function to stop recording
function stopRecording()
output:removeHandler("video-recording")
api.logging.LogInfo("Video recording stopped")
end
else
api.logging.LogError("Failed to start video recording")
end
CSV Data Logging¶
local output = api.factory.output.create(instance, "CSVLogger")
-- Configure CSV export with headers
local csvConfig = {
headers = true,
delimiter = ",",
append = true
}
local csvHandler = output:addHandler(
"analytics-log",
"csv:///logs/analytics_{{date}}.csv",
"analytics",
csvConfig
)
if csvHandler then
api.logging.LogInfo("CSV logging configured: /logs/analytics_[date].csv")
-- CSV files will automatically include headers and append new data
-- Dynamic date variable ensures daily log files
else
api.logging.LogError("Failed to create CSV logger")
end
Image Crop Export¶
local output = api.factory.output.create(instance, "CropExporter")
-- Configure JPEG crop export
local jpegConfig = {
quality = 90,
resize = {width = 224, height = 224},
format = "RGB"
}
local cropHandler = output:addHandler(
"detection-crops",
"jpg:///crops/{{object_class}}/crop_{{detection_id}}.jpg",
"crops",
jpegConfig
)
if cropHandler then
api.logging.LogInfo("Crop export configured: /crops/[class]/crop_[id].jpg")
-- Images will be saved with dynamic paths based on detection data
-- Each object class gets its own subdirectory
else
api.logging.LogError("Failed to create crop exporter")
end
Multi-Format Export¶
local output = api.factory.output.create(instance, "MultiFormatExporter")
-- Export the same data to multiple formats
local formats = {
{
name = "json-backup",
uri = "json:///backup/data_{{timestamp}}.json",
sink = "events",
config = {format = "compact", compression = true}
},
{
name = "csv-analysis",
uri = "csv:///analysis/events_{{date}}.csv",
sink = "events",
config = {headers = true, delimiter = ";"}
},
{
name = "txt-log",
uri = "txt:///logs/system_{{timestamp}}.txt",
sink = "events",
config = {append = true}
}
}
-- Create all export handlers
local activeHandlers = {}
for _, format in ipairs(formats) do
local handler = output:addHandler(format.name, format.uri, format.sink, format.config)
if handler then
table.insert(activeHandlers, format.name)
api.logging.LogInfo("Created handler: " .. format.name)
else
api.logging.LogError("Failed to create handler: " .. format.name)
end
end
-- Function to stop all exports
function stopAllExports()
for _, name in ipairs(activeHandlers) do
output:removeHandler(name)
api.logging.LogInfo("Stopped: " .. name)
end
activeHandlers = {}
end
Directory Organization¶
local output = api.factory.output.create(instance, "OrganizedExporter")
-- Organize outputs by date and type
local organizationConfig = {
events = "json:///data/{{yyyy}}/{{mm}}/{{dd}}/events_{{hh}}.json",
analytics = "csv:///analytics/{{yyyy}}/monthly_{{mm}}.csv",
images = "jpg:///images/{{yyyy}}/{{mm}}/{{dd}}/{{object_class}}/image_{{id}}.jpg",
videos = "mp4:///recordings/{{yyyy}}/{{mm}}/session_{{timestamp}}.mp4"
}
local handlers = {}
for dataType, uri in pairs(organizationConfig) do
local config = {
compression = (dataType == "events"), -- Compress JSON only
headers = (dataType == "analytics"), -- Headers for CSV only
quality = (dataType == "images") and 85 or nil -- JPEG quality
}
local handler = output:addHandler(
dataType .. "-export",
uri,
dataType,
config
)
if handler then
handlers[dataType] = handler
api.logging.LogInfo("Configured " .. dataType .. " export with organized directory structure")
end
end
Best Practices¶
File Organization¶
-
Directory Structure:
- Use date-based organization:
/data/{{yyyy}}/{{mm}}/{{dd}}/
- Separate by data type:
/analytics/
,/recordings/
,/images/
- Include metadata in paths:
/{{camera_id}}/{{object_class}}/
- Use date-based organization:
-
File Naming:
- Include timestamps:
events_{{timestamp}}.json
- Use descriptive names:
intrusion_detection_{{id}}.jpg
- Avoid special characters and spaces
- Include relevant metadata:
camera_{{id}}_{{event_type}}.mp4
- Include timestamps:
-
Format Selection:
- JSON for structured event data
- CSV for tabular analytics data
- MP4 for video recordings
- JPG for images (good compression)
- PNG for images (lossless quality)
Performance Optimization¶
-
File I/O:
- Use SSD storage for high-frequency writes
- Implement file rotation for large datasets
- Batch writes when possible
- Monitor disk space usage
-
Compression:
- Enable compression for JSON files in production
- Use appropriate JPEG quality settings (80-95)
- Consider PNG for images requiring transparency
- Compress video files with appropriate codecs
-
Memory Management:
- Flush buffers regularly for real-time data
- Implement file size limits
- Use appropriate buffer sizes
- Monitor memory usage for large files
Security and Compliance¶
-
File Permissions:
- Set appropriate read/write permissions
- Use dedicated service accounts
- Implement access logging
- Regular permission audits
-
Data Retention:
- Implement automatic cleanup policies
- Archive old data appropriately
- Comply with data retention regulations
- Document retention policies
Troubleshooting¶
File System Issues¶
-
"Permission denied"
- Check directory write permissions:
ls -la /path/to/directory
- Verify user/group ownership:
chown user:group /path
- Ensure parent directories exist:
mkdir -p /path/to/parent
- Check SELinux/AppArmor policies if applicable
- Check directory write permissions:
-
"No such file or directory"
- Verify path exists and is accessible
- Check for typos in URI configuration
- Ensure dynamic path variables are resolved correctly
- Validate directory creation permissions
-
"Disk full" errors
- Monitor disk space:
df -h
- Implement file rotation policies
- Configure cleanup scripts
- Set up disk usage alerts
- Monitor disk space:
Format-Specific Issues¶
-
JSON Export Problems
- Validate JSON structure before writing
- Check for circular references in data
- Verify character encoding (UTF-8)
- Test with JSON validators
-
Video Recording Issues
- Check video codec availability
- Verify sufficient disk I/O performance
- Monitor frame rate consistency
- Validate video container format support
-
Image Export Problems
- Verify image format support libraries
- Check image dimensions and bit depth
- Validate color space conversions
- Test compression settings
Performance Issues¶
-
Slow write performance
- Use faster storage (SSD vs HDD)
- Optimize file buffer sizes
- Reduce compression overhead
- Monitor system I/O utilization
-
High memory usage
- Implement streaming writes for large files
- Reduce buffer sizes if memory constrained
- Monitor for memory leaks
- Use appropriate data structures
-
Network storage issues
- Check network latency and bandwidth
- Implement retry logic for network failures
- Use local buffering for network storage
- Monitor network connectivity
Integration Examples¶
External System Integration¶
# Monitor output directory for new files
inotifywait -m /data/analytics/ -e create -e moved_to |
while read path action file; do
echo "New file: $file"
# Process new analytics files
python process_analytics.py "$path$file"
done
Log Analysis Scripts¶
import json
import csv
from pathlib import Path
def process_json_exports(directory):
"""Process JSON exports from WriteData plugin"""
for json_file in Path(directory).glob("**/*.json"):
with open(json_file, 'r') as f:
data = json.load(f)
# Process analytics data
print(f"Processed {len(data.get('events', []))} events from {json_file}")
def convert_to_csv(json_directory, csv_output):
"""Convert JSON exports to consolidated CSV"""
with open(csv_output, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['timestamp', 'event_type', 'confidence', 'object_class'])
for json_file in Path(json_directory).glob("**/*.json"):
with open(json_file, 'r') as f:
data = json.load(f)
for event in data.get('events', []):
writer.writerow([
event.get('timestamp'),
event.get('type'),
event.get('confidence'),
event.get('object_class')
])
File Rotation Script¶
#!/bin/bash
# Rotate WriteData output files
ANALYTICS_DIR="/data/analytics"
MAX_SIZE="100M"
ARCHIVE_DIR="/archive/analytics"
find "$ANALYTICS_DIR" -name "*.json" -size +"$MAX_SIZE" | while read file; do
echo "Rotating large file: $file"
# Create archive directory
mkdir -p "$ARCHIVE_DIR/$(date +%Y/%m)"
# Compress and move file
gzip "$file"
mv "$file.gz" "$ARCHIVE_DIR/$(date +%Y/%m)/"
echo "Archived: $file.gz"
done
See Also¶
- Output Plugins Overview
- Output Plugin - Main output coordination system
- GStreamerWriter Plugin - Video streaming output
- HLS Plugin - HTTP Live Streaming
- MQTT Plugin - Real-time messaging
- REST Plugin - HTTP API export
- Dynamic Strings Documentation - Path templating
- Output Handler Scripting - Custom output processing