Stream Processing with eKuiper
FlowMQ integrates with eKuiper, an open-source lightweight stream processing engine, to provide powerful real-time data processing capabilities. This integration enables you to process, analyze, and transform streaming data from multiple sources in real-time.
Overview
eKuiper is a lightweight IoT edge analytics software that can run on various edge devices. When integrated with FlowMQ, it provides:
- Real-time Stream Processing: Process data as it flows through FlowMQ
- Multi-protocol Support: Handle data from MQTT, Kafka, AMQP, and other protocols
- SQL-like Queries: Use familiar SQL syntax for stream processing
- Edge Computing: Process data at the edge for reduced latency
- Rule Engine: Create complex processing rules and transformations
Architecture
Data Flow
- Data Ingestion: Messages arrive via MQTT, Kafka, AMQP, or other protocols
- Stream Processing: eKuiper processes the data using SQL-like queries
- Data Transformation: Apply filters, aggregations, and transformations
- Output: Send processed data to various sinks (databases, APIs, other topics)
Components
- FlowMQ: Message broker and routing engine
- eKuiper Engine: Stream processing engine
- Streams: Data sources (MQTT topics, Kafka topics, etc.)
- Rules: Processing logic and transformations
- Sinks: Output destinations
Getting Started
Installation
bash
# Install eKuiper with FlowMQ connector
docker run -d --name kuiper \
-p 9081:9081 \
-e MQTT_SOURCE_SERVER=tcp://your-namespace.flowmq.io:1883 \
-e MQTT_SOURCE_TOPIC=sensors/# \
lfedge/ekuiper:1.11.0Basic Configuration
yaml
# eKuiper configuration
mqtt:
source:
server: "tcp://your-namespace.flowmq.io:1883"
topic: "sensors/#"
qos: 1
sink:
server: "tcp://your-namespace.flowmq.io:1883"
topic: "processed/sensors"
qos: 1Stream Processing Examples
Basic Data Filtering
sql
-- Filter temperature readings above 25°C
SELECT temperature, humidity, timestamp
FROM sensors
WHERE temperature > 25Data Aggregation
sql
-- Calculate average temperature per room every 5 minutes
SELECT
room_id,
AVG(temperature) as avg_temp,
MAX(temperature) as max_temp,
MIN(temperature) as min_temp,
COUNT(*) as reading_count
FROM sensors
WHERE temperature IS NOT NULL
GROUP BY room_id, TUMBLINGWINDOW(ss, 300)Data Transformation
sql
-- Convert temperature from Celsius to Fahrenheit
SELECT
sensor_id,
temperature * 9/5 + 32 as temperature_f,
humidity,
timestamp
FROM sensors
WHERE temperature IS NOT NULLComplex Event Processing
sql
-- Detect high temperature alerts
SELECT
sensor_id,
temperature,
humidity,
timestamp,
CASE
WHEN temperature > 30 THEN 'CRITICAL'
WHEN temperature > 25 THEN 'WARNING'
ELSE 'NORMAL'
END as alert_level
FROM sensors
WHERE temperature IS NOT NULLIntegration Patterns
MQTT to MQTT Processing
sql
-- Process sensor data and publish to different topics
SELECT
sensor_id,
temperature,
humidity,
timestamp
FROM sensors
WHERE temperature IS NOT NULLConfiguration:
yaml
sources:
- type: mqtt
name: sensors
server: "tcp://your-namespace.flowmq.io:1883"
topic: "sensors/#"
sinks:
- type: mqtt
name: processed_sensors
server: "tcp://your-namespace.flowmq.io:1883"
topic: "processed/sensors"Multi-Protocol Processing
sql
-- Process data from multiple sources
SELECT
source,
sensor_id,
temperature,
timestamp
FROM (
SELECT 'mqtt' as source, sensor_id, temperature, timestamp FROM mqtt_sensors
UNION ALL
SELECT 'kafka' as source, sensor_id, temperature, timestamp FROM kafka_sensors
)
WHERE temperature IS NOT NULLTime-Series Analysis
sql
-- Calculate moving average temperature
SELECT
sensor_id,
temperature,
AVG(temperature) OVER (
PARTITION BY sensor_id
ORDER BY timestamp
ROWS BETWEEN 9 PRECEDING AND CURRENT ROW
) as moving_avg_10,
timestamp
FROM sensors
WHERE temperature IS NOT NULLAdvanced Features
Window Functions
sql
-- Sliding window aggregation
SELECT
sensor_id,
AVG(temperature) as avg_temp,
STDDEV(temperature) as temp_stddev,
COUNT(*) as reading_count
FROM sensors
GROUP BY sensor_id, SLIDINGWINDOW(ss, 60, 10)Pattern Matching
sql
-- Detect temperature spike patterns
SELECT
sensor_id,
temperature,
timestamp
FROM sensors
WHERE temperature > 25
PATTERN (temp1 temperature > 25) -> (temp2 temperature > 30)
WITHIN 5mData Enrichment
sql
-- Enrich sensor data with location information
SELECT
s.sensor_id,
s.temperature,
s.humidity,
l.room_name,
l.floor,
s.timestamp
FROM sensors s
JOIN locations l ON s.sensor_id = l.sensor_idUse Cases
IoT Sensor Processing
sql
-- Process IoT sensor data for smart building
SELECT
sensor_id,
room_id,
temperature,
humidity,
CASE
WHEN temperature > 26 THEN 'COOLING_NEEDED'
WHEN temperature < 18 THEN 'HEATING_NEEDED'
ELSE 'COMFORTABLE'
END as hvac_action,
timestamp
FROM building_sensors
WHERE temperature IS NOT NULL AND humidity IS NOT NULLFinancial Data Processing
sql
-- Process stock price data
SELECT
symbol,
price,
AVG(price) OVER (
PARTITION BY symbol
ORDER BY timestamp
ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
) as moving_avg_20,
CASE
WHEN price > LAG(price, 1) OVER (PARTITION BY symbol ORDER BY timestamp) THEN 'UP'
WHEN price < LAG(price, 1) OVER (PARTITION BY symbol ORDER BY timestamp) THEN 'DOWN'
ELSE 'SAME'
END as price_direction,
timestamp
FROM stock_pricesLog Analysis
sql
-- Analyze application logs
SELECT
level,
service,
COUNT(*) as error_count,
AVG(CAST(response_time AS FLOAT)) as avg_response_time
FROM application_logs
WHERE level IN ('ERROR', 'WARN')
GROUP BY level, service, TUMBLINGWINDOW(ss, 60)Performance Optimization
Memory Management
yaml
# eKuiper configuration for memory optimization
mqtt:
source:
buffer_size: 1024
buffer_tick: 1000
sink:
buffer_size: 1024
buffer_tick: 1000Parallel Processing
sql
-- Enable parallel processing for multiple streams
SELECT
sensor_id,
temperature,
humidity,
timestamp
FROM sensors
WHERE temperature IS NOT NULL
PARALLEL 4Monitoring and Debugging
Metrics Collection
sql
-- Monitor processing performance
SELECT
COUNT(*) as message_count,
AVG(CAST(processing_time AS FLOAT)) as avg_processing_time,
MAX(temperature) as max_temp,
MIN(temperature) as min_temp
FROM sensors
GROUP BY TUMBLINGWINDOW(ss, 60)Error Handling
sql
-- Handle and log processing errors
SELECT
sensor_id,
temperature,
CASE
WHEN temperature IS NULL THEN 'MISSING_TEMPERATURE'
WHEN temperature < -50 OR temperature > 100 THEN 'INVALID_TEMPERATURE'
ELSE 'VALID'
END as data_quality,
timestamp
FROM sensorsBest Practices
Optimize Queries
- Use appropriate window sizes
- Filter data early in the pipeline
- Avoid complex joins on high-volume streams
Handle Data Quality
- Validate input data
- Handle missing or invalid values
- Implement error recovery
Monitor Performance
- Track processing latency
- Monitor memory usage
- Set up alerts for failures
Scale Appropriately
- Use parallel processing for high-volume streams
- Distribute processing across multiple instances
- Implement load balancing
Integration with FlowMQ Features
Protocol Interoperability
eKuiper can process data from any FlowMQ-supported protocol:
- MQTT: Real-time sensor data
- Kafka: High-throughput event streams
- AMQP: Enterprise messaging
- HTTP: REST API data
Message Routing
sql
-- Route processed data to different destinations based on content
SELECT
sensor_id,
temperature,
humidity,
CASE
WHEN temperature > 30 THEN 'alerts/critical'
WHEN temperature > 25 THEN 'alerts/warning'
ELSE 'data/normal'
END as topic,
timestamp
FROM sensorsNext Steps
- Learn about Supported Protocols
- Explore eKuiper Documentation
- Check Stream Processing Examples
Future Enhancements
Stream processing capabilities in FlowMQ will continue to evolve with:
- Advanced Analytics: Machine learning integration
- Visual Rule Builder: GUI for creating processing rules
- Real-time Dashboards: Live monitoring and visualization
- Enhanced Scalability: Distributed processing across clusters
- More Connectors: Additional data source and sink support