Apache Spark Streaming for Real-Time Analytics: Production Implementation Guide
Real-time data processing has become essential for modern applications, from fraud detection to live dashboards. Apache …
Combining our expertise in MQTT messaging and Apache Spark processing, this comprehensive guide demonstrates building production-ready IoT data pipeline architectures that handle millions of sensor readings with real-time analytics capabilities.
1# iot_pipeline_orchestrator.py
2import asyncio
3import json
4import logging
5from dataclasses import dataclass, asdict
6from typing import Dict, List, Optional
7from datetime import datetime, timezone
8from concurrent.futures import ThreadPoolExecutor
9import paho.mqtt.client as mqtt
10from pyspark.sql import SparkSession
11from pyspark.sql.functions import *
12from pyspark.sql.types import *
13import redis
14import influxdb_client
15from influxdb_client.client.write_api import SYNCHRONOUS
16
17@dataclass
18class SensorReading:
19 device_id: str
20 sensor_type: str
21 timestamp: datetime
22 value: float
23 unit: str
24 location: Dict[str, float]
25 metadata: Dict[str, any]
26
27 def to_json(self) -> str:
28 data = asdict(self)
29 data['timestamp'] = self.timestamp.isoformat()
30 return json.dumps(data)
31
32 @classmethod
33 def from_json(cls, json_str: str) -> 'SensorReading':
34 data = json.loads(json_str)
35 data['timestamp'] = datetime.fromisoformat(data['timestamp'])
36 return cls(**data)
37
38 def validate(self) -> bool:
39 """Validate sensor reading data"""
40 if not self.device_id or not self.sensor_type:
41 return False
42 if self.value is None or not isinstance(self.value, (int, float)):
43 return False
44 if not isinstance(self.timestamp, datetime):
45 return False
46 return True
47
48class IoTDataPipelineOrchestrator:
49 def __init__(self, config: Dict):
50 self.config = config
51 self.logger = logging.getLogger(__name__)
52
53 # Initialize components
54 self.mqtt_client = None
55 self.spark_session = None
56 self.redis_client = None
57 self.influxdb_client = None
58
59 # Processing queues
60 self.raw_data_queue = asyncio.Queue(maxsize=10000)
61 self.processed_data_queue = asyncio.Queue(maxsize=5000)
62
63 # Processing statistics
64 self.stats = {
65 'messages_received': 0,
66 'messages_processed': 0,
67 'messages_failed': 0,
68 'processing_time_avg': 0.0
69 }
70
71 self.initialize_components()
72
73 def initialize_components(self):
74 """Initialize all pipeline components"""
75 self.setup_mqtt()
76 self.setup_spark()
77 self.setup_redis()
78 self.setup_influxdb()
79
80 def setup_mqtt(self):
81 """Configure MQTT client for sensor data ingestion"""
82 self.mqtt_client = mqtt.Client(
83 client_id=f"iot_pipeline_{datetime.now().timestamp()}"
84 )
85
86 def on_connect(client, userdata, flags, rc):
87 if rc == 0:
88 self.logger.info("Connected to MQTT broker")
89 # Subscribe to all sensor topics
90 client.subscribe("sensors/+/+", qos=1)
91 client.subscribe("devices/+/status", qos=1)
92 else:
93 self.logger.error(f"Failed to connect to MQTT broker: {rc}")
94
95 def on_message(client, userdata, msg):
96 """Handle incoming MQTT messages"""
97 try:
98 payload = msg.payload.decode('utf-8')
99 topic_parts = msg.topic.split('/')
100
101 if len(topic_parts) >= 3:
102 device_id = topic_parts[1]
103 sensor_type = topic_parts[2]
104
105 # Parse sensor data
106 sensor_data = json.loads(payload)
107
108 # Create sensor reading object
109 reading = SensorReading(
110 device_id=device_id,
111 sensor_type=sensor_type,
112 timestamp=datetime.now(timezone.utc),
113 value=sensor_data.get('value'),
114 unit=sensor_data.get('unit', ''),
115 location=sensor_data.get('location', {}),
116 metadata=sensor_data.get('metadata', {})
117 )
118
119 # Validate and queue for processing
120 if reading.validate():
121 asyncio.create_task(
122 self.raw_data_queue.put(reading)
123 )
124 self.stats['messages_received'] += 1
125 else:
126 self.logger.warning(f"Invalid sensor reading: {reading}")
127
128 except Exception as e:
129 self.logger.error(f"Error processing MQTT message: {e}")
130 self.stats['messages_failed'] += 1
131
132 self.mqtt_client.on_connect = on_connect
133 self.mqtt_client.on_message = on_message
134
135 # Connect to MQTT broker
136 broker_config = self.config['mqtt']
137 self.mqtt_client.connect(
138 broker_config['host'],
139 broker_config['port'],
140 broker_config['keepalive']
141 )
142
143 def setup_spark(self):
144 """Initialize Spark session for stream processing"""
145 self.spark_session = SparkSession.builder \
146 .appName("IoTDataPipeline") \
147 .config("spark.sql.streaming.metricsEnabled", "true") \
148 .config("spark.sql.adaptive.enabled", "true") \
149 .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
150 .config("spark.streaming.backpressure.enabled", "true") \
151 .getOrCreate()
152
153 self.spark_session.sparkContext.setLogLevel("WARN")
154
155 def setup_redis(self):
156 """Initialize Redis for caching and real-time data"""
157 redis_config = self.config['redis']
158 self.redis_client = redis.Redis(
159 host=redis_config['host'],
160 port=redis_config['port'],
161 db=redis_config['db'],
162 decode_responses=True
163 )
164
165 def setup_influxdb(self):
166 """Initialize InfluxDB for time-series data storage"""
167 influx_config = self.config['influxdb']
168 self.influxdb_client = influxdb_client.InfluxDBClient(
169 url=influx_config['url'],
170 token=influx_config['token'],
171 org=influx_config['org']
172 )
173
174 async def start_pipeline(self):
175 """Start the complete IoT data pipeline"""
176 self.logger.info("Starting IoT data pipeline...")
177
178 # Start MQTT client
179 self.mqtt_client.loop_start()
180
181 # Start async processing tasks
182 tasks = [
183 asyncio.create_task(self.process_raw_data()),
184 asyncio.create_task(self.real_time_analytics()),
185 asyncio.create_task(self.batch_processing()),
186 asyncio.create_task(self.health_monitor()),
187 ]
188
189 await asyncio.gather(*tasks)
190
191 async def process_raw_data(self):
192 """Process incoming raw sensor data"""
193 while True:
194 try:
195 # Get raw sensor reading
196 reading = await self.raw_data_queue.get()
197
198 # Apply data preprocessing
199 processed_reading = await self.preprocess_data(reading)
200
201 # Store in real-time cache
202 await self.store_realtime_data(processed_reading)
203
204 # Queue for batch processing
205 await self.processed_data_queue.put(processed_reading)
206
207 self.stats['messages_processed'] += 1
208
209 except Exception as e:
210 self.logger.error(f"Error processing raw data: {e}")
211 self.stats['messages_failed'] += 1
212
213 async def preprocess_data(self, reading: SensorReading) -> SensorReading:
214 """Apply data preprocessing and validation"""
215
216 # Data cleansing
217 if reading.sensor_type == 'temperature':
218 # Temperature range validation
219 if not (-50 <= reading.value <= 150):
220 self.logger.warning(f"Temperature out of range: {reading.value}")
221 return None
222
223 elif reading.sensor_type == 'humidity':
224 # Humidity percentage validation
225 reading.value = max(0, min(100, reading.value))
226
227 elif reading.sensor_type == 'pressure':
228 # Pressure validation (in hPa)
229 if not (800 <= reading.value <= 1200):
230 self.logger.warning(f"Pressure out of range: {reading.value}")
231 return None
232
233 # Enrich with location data if missing
234 if not reading.location:
235 device_location = await self.get_device_location(reading.device_id)
236 reading.location = device_location
237
238 # Add processing metadata
239 reading.metadata['processed_at'] = datetime.now(timezone.utc).isoformat()
240 reading.metadata['pipeline_version'] = '1.0'
241
242 return reading
243
244 async def store_realtime_data(self, reading: SensorReading):
245 """Store data in Redis for real-time access"""
246 if not reading:
247 return
248
249 # Store latest reading per device/sensor
250 key = f"sensor:{reading.device_id}:{reading.sensor_type}"
251 self.redis_client.hset(key, mapping={
252 'value': reading.value,
253 'timestamp': reading.timestamp.isoformat(),
254 'unit': reading.unit
255 })
256
257 # Set expiration for cleanup
258 self.redis_client.expire(key, 3600) # 1 hour
259
260 # Store in time-series list for recent history
261 ts_key = f"ts:{reading.device_id}:{reading.sensor_type}"
262 self.redis_client.lpush(ts_key, reading.to_json())
263 self.redis_client.ltrim(ts_key, 0, 100) # Keep last 100 readings
264 self.redis_client.expire(ts_key, 3600)
265
266 async def real_time_analytics(self):
267 """Perform real-time analytics and alerting"""
268 while True:
269 try:
270 # Process readings from queue
271 reading = await self.processed_data_queue.get()
272
273 if not reading:
274 continue
275
276 # Check for anomalies
277 anomaly = await self.detect_anomalies(reading)
278 if anomaly:
279 await self.trigger_alert(reading, anomaly)
280
281 # Update real-time dashboards
282 await self.update_dashboard_metrics(reading)
283
284 # Store in time-series database
285 await self.store_timeseries_data(reading)
286
287 except Exception as e:
288 self.logger.error(f"Error in real-time analytics: {e}")
289
290 async def detect_anomalies(self, reading: SensorReading) -> Optional[Dict]:
291 """Detect anomalies in sensor readings"""
292 device_key = f"stats:{reading.device_id}:{reading.sensor_type}"
293
294 # Get historical statistics
295 stats = self.redis_client.hmget(device_key, 'mean', 'std', 'count')
296
297 if stats[0] and stats[1]: # Have historical data
298 mean = float(stats[0])
299 std = float(stats[1])
300 count = int(stats[2] or 0)
301
302 # Simple z-score anomaly detection
303 z_score = abs(reading.value - mean) / max(std, 0.1)
304
305 if z_score > 3.0: # 3 sigma rule
306 return {
307 'type': 'statistical_anomaly',
308 'z_score': z_score,
309 'expected_range': [mean - 2*std, mean + 2*std],
310 'actual_value': reading.value
311 }
312
313 # Update running statistics
314 await self.update_running_stats(device_key, reading.value)
315
316 return None
317
318 async def trigger_alert(self, reading: SensorReading, anomaly: Dict):
319 """Trigger alerts for anomalous readings"""
320 alert_data = {
321 'device_id': reading.device_id,
322 'sensor_type': reading.sensor_type,
323 'value': reading.value,
324 'timestamp': reading.timestamp.isoformat(),
325 'anomaly': anomaly,
326 'location': reading.location
327 }
328
329 # Publish alert via MQTT
330 alert_topic = f"alerts/{reading.device_id}/{reading.sensor_type}"
331 self.mqtt_client.publish(alert_topic, json.dumps(alert_data), qos=1)
332
333 # Store alert in database
334 alert_key = f"alert:{reading.device_id}:{reading.timestamp.timestamp()}"
335 self.redis_client.hset(alert_key, mapping=alert_data)
336 self.redis_client.expire(alert_key, 86400 * 7) # Keep for 7 days
337
338 self.logger.warning(f"Alert triggered: {alert_data}")
339
340# Spark-based batch processing component
341class SparkBatchProcessor:
342 def __init__(self, spark_session: SparkSession, config: Dict):
343 self.spark = spark_session
344 self.config = config
345
346 def create_sensor_data_schema(self):
347 """Define schema for sensor data"""
348 return StructType([
349 StructField("device_id", StringType(), False),
350 StructField("sensor_type", StringType(), False),
351 StructField("timestamp", TimestampType(), False),
352 StructField("value", DoubleType(), False),
353 StructField("unit", StringType(), True),
354 StructField("location", MapType(StringType(), DoubleType()), True),
355 StructField("metadata", MapType(StringType(), StringType()), True)
356 ])
357
358 def process_hourly_aggregations(self, data_path: str, output_path: str):
359 """Process hourly aggregations of sensor data"""
360 schema = self.create_sensor_data_schema()
361
362 # Read sensor data
363 df = self.spark.read \
364 .schema(schema) \
365 .json(data_path)
366
367 # Hourly aggregations
368 hourly_agg = df \
369 .withColumn("hour", date_trunc("hour", col("timestamp"))) \
370 .groupBy("device_id", "sensor_type", "hour") \
371 .agg(
372 avg("value").alias("avg_value"),
373 min("value").alias("min_value"),
374 max("value").alias("max_value"),
375 stddev("value").alias("std_value"),
376 count("*").alias("reading_count")
377 )
378
379 # Write aggregated data
380 hourly_agg.write \
381 .mode("append") \
382 .partitionBy("sensor_type", "hour") \
383 .parquet(output_path)
384
385 def detect_device_patterns(self, data_path: str):
386 """Analyze device behavior patterns"""
387 schema = self.create_sensor_data_schema()
388
389 df = self.spark.read \
390 .schema(schema) \
391 .json(data_path)
392
393 # Device activity patterns
394 device_patterns = df \
395 .withColumn("hour_of_day", hour(col("timestamp"))) \
396 .withColumn("day_of_week", dayofweek(col("timestamp"))) \
397 .groupBy("device_id", "hour_of_day", "day_of_week") \
398 .agg(
399 count("*").alias("reading_count"),
400 avg("value").alias("avg_value")
401 )
402
403 return device_patterns
1# edge_processor.py
2import asyncio
3import json
4from typing import Dict, List
5import numpy as np
6from scipy import signal
7from sklearn.preprocessing import StandardScaler
8from sklearn.ensemble import IsolationForest
9import joblib
10
11class EdgeDataProcessor:
12 """Process data at the edge before sending to central pipeline"""
13
14 def __init__(self, device_id: str, config: Dict):
15 self.device_id = device_id
16 self.config = config
17
18 # Local data buffer
19 self.data_buffer = {}
20 self.buffer_size = config.get('buffer_size', 100)
21
22 # ML models for edge processing
23 self.anomaly_detector = None
24 self.load_models()
25
26 # Processing statistics
27 self.processed_count = 0
28 self.anomalies_detected = 0
29
30 def load_models(self):
31 """Load pre-trained ML models for edge processing"""
32 try:
33 model_path = f"models/anomaly_detector_{self.device_id}.joblib"
34 self.anomaly_detector = joblib.load(model_path)
35 except FileNotFoundError:
36 # Create default model if none exists
37 self.anomaly_detector = IsolationForest(
38 contamination=0.1,
39 random_state=42
40 )
41
42 async def process_sensor_reading(self, sensor_type: str, value: float,
43 timestamp: datetime) -> Dict:
44 """Process individual sensor reading at the edge"""
45
46 # Add to buffer
47 if sensor_type not in self.data_buffer:
48 self.data_buffer[sensor_type] = []
49
50 self.data_buffer[sensor_type].append({
51 'value': value,
52 'timestamp': timestamp
53 })
54
55 # Maintain buffer size
56 if len(self.data_buffer[sensor_type]) > self.buffer_size:
57 self.data_buffer[sensor_type].pop(0)
58
59 # Process if we have enough data
60 processed_data = {
61 'device_id': self.device_id,
62 'sensor_type': sensor_type,
63 'raw_value': value,
64 'timestamp': timestamp,
65 'processed_value': value,
66 'quality_score': 1.0,
67 'anomaly_score': 0.0,
68 'should_transmit': True
69 }
70
71 if len(self.data_buffer[sensor_type]) >= 10:
72 # Apply signal processing
73 filtered_value = await self.apply_signal_processing(sensor_type, value)
74 processed_data['processed_value'] = filtered_value
75
76 # Quality assessment
77 quality = await self.assess_data_quality(sensor_type)
78 processed_data['quality_score'] = quality
79
80 # Anomaly detection
81 anomaly_score = await self.detect_edge_anomaly(sensor_type, filtered_value)
82 processed_data['anomaly_score'] = anomaly_score
83
84 # Transmission optimization
85 should_transmit = await self.optimize_transmission(sensor_type,
86 filtered_value, quality)
87 processed_data['should_transmit'] = should_transmit
88
89 self.processed_count += 1
90 return processed_data
91
92 async def apply_signal_processing(self, sensor_type: str,
93 current_value: float) -> float:
94 """Apply signal processing filters"""
95
96 values = [reading['value'] for reading in self.data_buffer[sensor_type]]
97 values_array = np.array(values)
98
99 # Apply different filters based on sensor type
100 if sensor_type in ['accelerometer', 'gyroscope']:
101 # Low-pass filter for motion sensors
102 b, a = signal.butter(3, 0.1, btype='low')
103 filtered = signal.filtfilt(b, a, values_array)
104 return filtered[-1]
105
106 elif sensor_type in ['temperature', 'humidity']:
107 # Moving average for environmental sensors
108 window_size = min(5, len(values_array))
109 return np.mean(values_array[-window_size:])
110
111 else:
112 # Median filter for general noise reduction
113 window_size = min(3, len(values_array))
114 return np.median(values_array[-window_size:])
115
116 async def assess_data_quality(self, sensor_type: str) -> float:
117 """Assess the quality of sensor data"""
118
119 if sensor_type not in self.data_buffer or \
120 len(self.data_buffer[sensor_type]) < 5:
121 return 1.0
122
123 values = [reading['value'] for reading in self.data_buffer[sensor_type][-10:]]
124
125 # Calculate quality metrics
126 std_dev = np.std(values)
127 mean_val = np.mean(values)
128
129 # Coefficient of variation
130 cv = std_dev / max(abs(mean_val), 0.001)
131
132 # Quality score based on stability (lower CV = higher quality)
133 quality_score = max(0.1, 1.0 - min(cv / 2.0, 0.9))
134
135 return quality_score
136
137 async def detect_edge_anomaly(self, sensor_type: str, value: float) -> float:
138 """Detect anomalies using lightweight edge ML"""
139
140 if len(self.data_buffer[sensor_type]) < 20:
141 return 0.0
142
143 # Prepare data for anomaly detection
144 recent_values = [reading['value'] for reading in
145 self.data_buffer[sensor_type][-20:]]
146
147 # Simple statistical anomaly detection
148 mean_val = np.mean(recent_values[:-1]) # Exclude current value
149 std_val = np.std(recent_values[:-1])
150
151 if std_val > 0:
152 z_score = abs(value - mean_val) / std_val
153 anomaly_score = min(z_score / 3.0, 1.0) # Normalize to 0-1
154 else:
155 anomaly_score = 0.0
156
157 if anomaly_score > 0.8:
158 self.anomalies_detected += 1
159
160 return anomaly_score
161
162 async def optimize_transmission(self, sensor_type: str, value: float,
163 quality: float) -> bool:
164 """Optimize data transmission to reduce bandwidth"""
165
166 # Always transmit high-quality anomalies
167 if quality > 0.8 and \
168 await self.detect_edge_anomaly(sensor_type, value) > 0.7:
169 return True
170
171 # Transmit based on change threshold
172 if len(self.data_buffer[sensor_type]) >= 2:
173 last_transmitted = self.data_buffer[sensor_type][-2]['value']
174 change_threshold = self.config.get('transmission_threshold', {}).get(
175 sensor_type, 0.1
176 )
177
178 if abs(value - last_transmitted) / max(abs(last_transmitted), 0.001) > change_threshold:
179 return True
180
181 # Periodic transmission (every N readings)
182 transmission_interval = self.config.get('transmission_interval', {}).get(
183 sensor_type, 10
184 )
185
186 return self.processed_count % transmission_interval == 0
187
188# Usage example
189async def main():
190 config = {
191 'mqtt': {
192 'host': 'localhost',
193 'port': 1883,
194 'keepalive': 60
195 },
196 'redis': {
197 'host': 'localhost',
198 'port': 6379,
199 'db': 0
200 },
201 'influxdb': {
202 'url': 'http://localhost:8086',
203 'token': 'your-token',
204 'org': 'your-org'
205 }
206 }
207
208 # Start IoT pipeline
209 pipeline = IoTDataPipelineOrchestrator(config)
210 await pipeline.start_pipeline()
211
212if __name__ == "__main__":
213 asyncio.run(main())
This IoT data pipeline architecture provides a complete solution for handling sensor data from collection through real-time analytics. The design supports edge processing, anomaly detection, and scalable stream processing suitable for production IoT deployments.
For foundational concepts, explore our MQTT messaging tutorial and Spark performance optimization guide.