Skip to content

Stream Processing with eKuiper

FlowMQ integrates with eKuiper, an open-source lightweight stream processing engine, to provide powerful real-time data processing capabilities. This integration enables you to process, analyze, and transform streaming data from multiple sources in real-time.

Overview

eKuiper is a lightweight IoT edge analytics software that can run on various edge devices. When integrated with FlowMQ, it provides:

  • Real-time Stream Processing: Process data as it flows through FlowMQ
  • Multi-protocol Support: Handle data from MQTT, Kafka, AMQP, and other protocols
  • SQL-like Queries: Use familiar SQL syntax for stream processing
  • Edge Computing: Process data at the edge for reduced latency
  • Rule Engine: Create complex processing rules and transformations

Architecture

Data Flow

  1. Data Ingestion: Messages arrive via MQTT, Kafka, AMQP, or other protocols
  2. Stream Processing: eKuiper processes the data using SQL-like queries
  3. Data Transformation: Apply filters, aggregations, and transformations
  4. Output: Send processed data to various sinks (databases, APIs, other topics)

Components

  • FlowMQ: Message broker and routing engine
  • eKuiper Engine: Stream processing engine
  • Streams: Data sources (MQTT topics, Kafka topics, etc.)
  • Rules: Processing logic and transformations
  • Sinks: Output destinations

Getting Started

Installation

bash
# Install eKuiper with FlowMQ connector
docker run -d --name kuiper \
  -p 9081:9081 \
  -e MQTT_SOURCE_SERVER=tcp://your-namespace.flowmq.io:1883 \
  -e MQTT_SOURCE_TOPIC=sensors/# \
  lfedge/ekuiper:1.11.0

Basic Configuration

yaml
# eKuiper configuration
mqtt:
  source:
    server: "tcp://your-namespace.flowmq.io:1883"
    topic: "sensors/#"
    qos: 1
  sink:
    server: "tcp://your-namespace.flowmq.io:1883"
    topic: "processed/sensors"
    qos: 1

Stream Processing Examples

Basic Data Filtering

sql
-- Filter temperature readings above 25°C
SELECT temperature, humidity, timestamp
FROM sensors
WHERE temperature > 25

Data Aggregation

sql
-- Calculate average temperature per room every 5 minutes
SELECT 
    room_id,
    AVG(temperature) as avg_temp,
    MAX(temperature) as max_temp,
    MIN(temperature) as min_temp,
    COUNT(*) as reading_count
FROM sensors
WHERE temperature IS NOT NULL
GROUP BY room_id, TUMBLINGWINDOW(ss, 300)

Data Transformation

sql
-- Convert temperature from Celsius to Fahrenheit
SELECT 
    sensor_id,
    temperature * 9/5 + 32 as temperature_f,
    humidity,
    timestamp
FROM sensors
WHERE temperature IS NOT NULL

Complex Event Processing

sql
-- Detect high temperature alerts
SELECT 
    sensor_id,
    temperature,
    humidity,
    timestamp,
    CASE 
        WHEN temperature > 30 THEN 'CRITICAL'
        WHEN temperature > 25 THEN 'WARNING'
        ELSE 'NORMAL'
    END as alert_level
FROM sensors
WHERE temperature IS NOT NULL

Integration Patterns

MQTT to MQTT Processing

sql
-- Process sensor data and publish to different topics
SELECT 
    sensor_id,
    temperature,
    humidity,
    timestamp
FROM sensors
WHERE temperature IS NOT NULL

Configuration:

yaml
sources:
  - type: mqtt
    name: sensors
    server: "tcp://your-namespace.flowmq.io:1883"
    topic: "sensors/#"

sinks:
  - type: mqtt
    name: processed_sensors
    server: "tcp://your-namespace.flowmq.io:1883"
    topic: "processed/sensors"

Multi-Protocol Processing

sql
-- Process data from multiple sources
SELECT 
    source,
    sensor_id,
    temperature,
    timestamp
FROM (
    SELECT 'mqtt' as source, sensor_id, temperature, timestamp FROM mqtt_sensors
    UNION ALL
    SELECT 'kafka' as source, sensor_id, temperature, timestamp FROM kafka_sensors
)
WHERE temperature IS NOT NULL

Time-Series Analysis

sql
-- Calculate moving average temperature
SELECT 
    sensor_id,
    temperature,
    AVG(temperature) OVER (
        PARTITION BY sensor_id 
        ORDER BY timestamp 
        ROWS BETWEEN 9 PRECEDING AND CURRENT ROW
    ) as moving_avg_10,
    timestamp
FROM sensors
WHERE temperature IS NOT NULL

Advanced Features

Window Functions

sql
-- Sliding window aggregation
SELECT 
    sensor_id,
    AVG(temperature) as avg_temp,
    STDDEV(temperature) as temp_stddev,
    COUNT(*) as reading_count
FROM sensors
GROUP BY sensor_id, SLIDINGWINDOW(ss, 60, 10)

Pattern Matching

sql
-- Detect temperature spike patterns
SELECT 
    sensor_id,
    temperature,
    timestamp
FROM sensors
WHERE temperature > 25
PATTERN (temp1 temperature > 25) -> (temp2 temperature > 30)
WITHIN 5m

Data Enrichment

sql
-- Enrich sensor data with location information
SELECT 
    s.sensor_id,
    s.temperature,
    s.humidity,
    l.room_name,
    l.floor,
    s.timestamp
FROM sensors s
JOIN locations l ON s.sensor_id = l.sensor_id

Use Cases

IoT Sensor Processing

sql
-- Process IoT sensor data for smart building
SELECT 
    sensor_id,
    room_id,
    temperature,
    humidity,
    CASE 
        WHEN temperature > 26 THEN 'COOLING_NEEDED'
        WHEN temperature < 18 THEN 'HEATING_NEEDED'
        ELSE 'COMFORTABLE'
    END as hvac_action,
    timestamp
FROM building_sensors
WHERE temperature IS NOT NULL AND humidity IS NOT NULL

Financial Data Processing

sql
-- Process stock price data
SELECT 
    symbol,
    price,
    AVG(price) OVER (
        PARTITION BY symbol 
        ORDER BY timestamp 
        ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
    ) as moving_avg_20,
    CASE 
        WHEN price > LAG(price, 1) OVER (PARTITION BY symbol ORDER BY timestamp) THEN 'UP'
        WHEN price < LAG(price, 1) OVER (PARTITION BY symbol ORDER BY timestamp) THEN 'DOWN'
        ELSE 'SAME'
    END as price_direction,
    timestamp
FROM stock_prices

Log Analysis

sql
-- Analyze application logs
SELECT 
    level,
    service,
    COUNT(*) as error_count,
    AVG(CAST(response_time AS FLOAT)) as avg_response_time
FROM application_logs
WHERE level IN ('ERROR', 'WARN')
GROUP BY level, service, TUMBLINGWINDOW(ss, 60)

Performance Optimization

Memory Management

yaml
# eKuiper configuration for memory optimization
mqtt:
  source:
    buffer_size: 1024
    buffer_tick: 1000
  sink:
    buffer_size: 1024
    buffer_tick: 1000

Parallel Processing

sql
-- Enable parallel processing for multiple streams
SELECT 
    sensor_id,
    temperature,
    humidity,
    timestamp
FROM sensors
WHERE temperature IS NOT NULL
PARALLEL 4

Monitoring and Debugging

Metrics Collection

sql
-- Monitor processing performance
SELECT 
    COUNT(*) as message_count,
    AVG(CAST(processing_time AS FLOAT)) as avg_processing_time,
    MAX(temperature) as max_temp,
    MIN(temperature) as min_temp
FROM sensors
GROUP BY TUMBLINGWINDOW(ss, 60)

Error Handling

sql
-- Handle and log processing errors
SELECT 
    sensor_id,
    temperature,
    CASE 
        WHEN temperature IS NULL THEN 'MISSING_TEMPERATURE'
        WHEN temperature < -50 OR temperature > 100 THEN 'INVALID_TEMPERATURE'
        ELSE 'VALID'
    END as data_quality,
    timestamp
FROM sensors

Best Practices

  1. Optimize Queries

    • Use appropriate window sizes
    • Filter data early in the pipeline
    • Avoid complex joins on high-volume streams
  2. Handle Data Quality

    • Validate input data
    • Handle missing or invalid values
    • Implement error recovery
  3. Monitor Performance

    • Track processing latency
    • Monitor memory usage
    • Set up alerts for failures
  4. Scale Appropriately

    • Use parallel processing for high-volume streams
    • Distribute processing across multiple instances
    • Implement load balancing

Integration with FlowMQ Features

Protocol Interoperability

eKuiper can process data from any FlowMQ-supported protocol:

  • MQTT: Real-time sensor data
  • Kafka: High-throughput event streams
  • AMQP: Enterprise messaging
  • HTTP: REST API data

Message Routing

sql
-- Route processed data to different destinations based on content
SELECT 
    sensor_id,
    temperature,
    humidity,
    CASE 
        WHEN temperature > 30 THEN 'alerts/critical'
        WHEN temperature > 25 THEN 'alerts/warning'
        ELSE 'data/normal'
    END as topic,
    timestamp
FROM sensors

Next Steps

Future Enhancements

Stream processing capabilities in FlowMQ will continue to evolve with:

  • Advanced Analytics: Machine learning integration
  • Visual Rule Builder: GUI for creating processing rules
  • Real-time Dashboards: Live monitoring and visualization
  • Enhanced Scalability: Distributed processing across clusters
  • More Connectors: Additional data source and sink support