Install Apache Spark 2 on Ubuntu 16.04 and macOS: Complete Setup Guide
Two of the earlier posts are discussing installing Apache Spark-0.8.0 and Apache Spark-1.1.0 on Ubuntu-12.04 and …
Real-time data processing has become essential for modern applications, from fraud detection to live dashboards. Apache Spark Streaming provides a powerful framework for building scalable, fault-tolerant streaming applications that process continuous data streams with the same APIs you use for batch processing.
This comprehensive guide builds upon our foundational Apache Spark performance tuning concepts to cover production-ready streaming implementations that can handle millions of events per second with guaranteed delivery and exactly-once processing semantics.
Apache Spark Streaming operates on a micro-batch architecture, where continuous data streams are divided into small batches processed using Spark’s core engine. This approach provides the benefits of batch processing (fault tolerance, exactly-once semantics) with near real-time latency.
DStream (Discretized Stream): The fundamental abstraction representing a continuous stream of data as a sequence of RDDs.
Receivers: Components that receive data from external sources and store it in Spark’s memory for processing.
Checkpointing: Mechanism for storing stream metadata and RDD data to enable fault recovery.
Output Operations: Actions that send processed stream data to external systems.
While this guide focuses on DStream-based Spark Streaming, it’s important to understand when to use each approach:
Use DStream-based Streaming when:
Use Structured Streaming when:
1import org.apache.spark.streaming._
2import org.apache.spark.streaming.kafka010._
3import org.apache.kafka.clients.consumer.ConsumerRecord
4import org.apache.kafka.common.serialization.StringDeserializer
5import org.apache.spark.sql.SparkSession
6import org.apache.spark.streaming.dstream.InputDStream
7
8/**
9 * Production Lambda Architecture implementation
10 * Processes real-time streams while maintaining batch views for accuracy
11 */
12object LambdaArchitectureStreaming {
13
14 def main(args: Array[String]): Unit = {
15 val spark = SparkSession.builder()
16 .appName("ProductionLambdaStreaming")
17 .config("spark.streaming.backpressure.enabled", "true")
18 .config("spark.streaming.kafka.maxRatePerPartition", "1000")
19 .config("spark.streaming.receiver.maxRate", "10000")
20 .getOrCreate()
21
22 val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
23
24 // Enable checkpointing for fault tolerance
25 ssc.checkpoint("hdfs://namenode:9000/streaming/checkpoints")
26
27 // Configure Kafka stream
28 val kafkaParams = Map[String, Object](
29 "bootstrap.servers" -> "broker1:9092,broker2:9092,broker3:9092",
30 "key.deserializer" -> classOf[StringDeserializer],
31 "value.deserializer" -> classOf[StringDeserializer],
32 "group.id" -> "spark-streaming-lambda",
33 "auto.offset.reset" -> "latest",
34 "enable.auto.commit" -> (false: java.lang.Boolean)
35 )
36
37 val topics = Array("user-events", "transaction-events", "system-events")
38 val stream = KafkaUtils.createDirectStream[String, String](
39 ssc,
40 PreferConsistentBrokers,
41 Subscribe[String, String](topics, kafkaParams)
42 )
43
44 // Real-time processing layer (Speed Layer)
45 val realTimeMetrics = processRealTimeEvents(stream)
46
47 // Store real-time results
48 realTimeMetrics.foreachRDD { rdd =>
49 if (!rdd.isEmpty()) {
50 // Store in fast-access storage (Redis, HBase)
51 storeRealTimeResults(rdd)
52
53 // Also append to batch storage for later batch processing
54 storeToBatchLayer(rdd)
55 }
56 }
57
58 ssc.start()
59 ssc.awaitTermination()
60 }
61
62 def processRealTimeEvents(stream: InputDStream[ConsumerRecord[String, String]]) = {
63 stream
64 .map(record => parseEvent(record.value()))
65 .filter(event => isValidEvent(event))
66 .window(Minutes(5), Seconds(30)) // 5-minute sliding window, 30-second slide
67 .map(event => (event.eventType, event.value))
68 .reduceByKey(_ + _)
69 .transform { rdd =>
70 // Apply complex aggregations and enrichment
71 enrichWithDimensions(rdd)
72 }
73 }
74
75 def parseEvent(json: String): Event = {
76 // Parse JSON event data
77 // Implementation depends on your event schema
78 Event.fromJson(json)
79 }
80
81 def isValidEvent(event: Event): Boolean = {
82 // Implement validation logic
83 event.timestamp > 0 && event.eventType.nonEmpty
84 }
85
86 def enrichWithDimensions(rdd: org.apache.spark.rdd.RDD[(String, Double)]) = {
87 // Join with dimension tables for enrichment
88 // This could involve broadcast variables for small dimensions
89 rdd.map { case (eventType, value) =>
90 val enrichedData = lookupDimension(eventType)
91 (eventType, value, enrichedData)
92 }
93 }
94
95 def storeRealTimeResults(rdd: org.apache.spark.rdd.RDD[_]): Unit = {
96 // Store in Redis, HBase, or other fast-access storage
97 rdd.foreachPartition { partition =>
98 val connection = createRedisConnection()
99 partition.foreach { record =>
100 connection.set(s"realtime:${record._1}", record._2.toString)
101 }
102 connection.close()
103 }
104 }
105
106 def storeToBatchLayer(rdd: org.apache.spark.rdd.RDD[_]): Unit = {
107 // Append to HDFS, S3, or data lake for batch processing
108 rdd.saveAsTextFile(s"hdfs://namenode:9000/batch-data/${System.currentTimeMillis()}")
109 }
110}
111
112case class Event(eventType: String, value: Double, timestamp: Long, userId: String)
113
114object Event {
115 def fromJson(json: String): Event = {
116 // JSON parsing implementation
117 // Use Jackson, Gson, or similar library
118 Event("sample", 1.0, System.currentTimeMillis(), "user1")
119 }
120}
1import org.apache.spark.streaming._
2import org.apache.spark.streaming.dstream.DStream
3
4/**
5 * Event Sourcing pattern implementation
6 * Maintains complete event history while providing materialized views
7 */
8object EventSourcingStreaming {
9
10 def setupEventSourcingPipeline(ssc: StreamingContext): Unit = {
11 // Configure multiple input streams
12 val eventStream = createKafkaEventStream(ssc)
13 val commandStream = createCommandStream(ssc)
14
15 // Process events to update materialized views
16 val processedEvents = eventStream
17 .map(parseEventRecord)
18 .updateStateByKey(updateEventState _)
19
20 // Generate materialized views
21 val userProfileView = generateUserProfileView(processedEvents)
22 val aggregateView = generateAggregateView(processedEvents)
23 val analyticsView = generateAnalyticsView(processedEvents)
24
25 // Persist materialized views
26 persistMaterializedViews(userProfileView, aggregateView, analyticsView)
27
28 // Handle commands for CQRS pattern
29 processCommands(commandStream, processedEvents)
30 }
31
32 def updateEventState(values: Seq[Event], state: Option[EventState]): Option[EventState] = {
33 val currentState = state.getOrElse(EventState.empty)
34 val newState = values.foldLeft(currentState) { (state, event) =>
35 event.eventType match {
36 case "user_created" => state.handleUserCreated(event)
37 case "user_updated" => state.handleUserUpdated(event)
38 case "transaction" => state.handleTransaction(event)
39 case "system_event" => state.handleSystemEvent(event)
40 case _ => state // Unknown event type
41 }
42 }
43 Some(newState)
44 }
45
46 def generateUserProfileView(eventStream: DStream[(String, Option[EventState])]): DStream[UserProfile] = {
47 eventStream
48 .filter(_._2.isDefined)
49 .map { case (userId, stateOpt) =>
50 val state = stateOpt.get
51 UserProfile(
52 userId = userId,
53 totalTransactions = state.transactionCount,
54 totalAmount = state.totalAmount,
55 lastActivity = state.lastActivityTime,
56 preferences = state.userPreferences
57 )
58 }
59 }
60
61 def generateAggregateView(eventStream: DStream[(String, Option[EventState])]): DStream[AggregateMetrics] = {
62 eventStream
63 .flatMap(_._2)
64 .transform { rdd =>
65 val totalUsers = rdd.count()
66 val totalTransactions = rdd.map(_.transactionCount).sum()
67 val totalAmount = rdd.map(_.totalAmount).sum()
68
69 rdd.sparkContext.parallelize(Seq(
70 AggregateMetrics(
71 timestamp = System.currentTimeMillis(),
72 totalUsers = totalUsers,
73 totalTransactions = totalTransactions.toLong,
74 totalAmount = totalAmount,
75 avgTransactionSize = if (totalTransactions > 0) totalAmount / totalTransactions else 0.0
76 )
77 ))
78 }
79 }
80}
81
82case class EventState(
83 transactionCount: Long,
84 totalAmount: Double,
85 lastActivityTime: Long,
86 userPreferences: Map[String, String]
87) {
88 def handleUserCreated(event: Event): EventState = {
89 this.copy(lastActivityTime = event.timestamp)
90 }
91
92 def handleUserUpdated(event: Event): EventState = {
93 this.copy(
94 lastActivityTime = event.timestamp,
95 userPreferences = userPreferences ++ parsePreferences(event.data)
96 )
97 }
98
99 def handleTransaction(event: Event): EventState = {
100 this.copy(
101 transactionCount = transactionCount + 1,
102 totalAmount = totalAmount + event.value,
103 lastActivityTime = event.timestamp
104 )
105 }
106
107 def handleSystemEvent(event: Event): EventState = {
108 this.copy(lastActivityTime = event.timestamp)
109 }
110}
111
112object EventState {
113 def empty: EventState = EventState(0, 0.0, 0L, Map.empty)
114}
115
116case class UserProfile(
117 userId: String,
118 totalTransactions: Long,
119 totalAmount: Double,
120 lastActivity: Long,
121 preferences: Map[String, String]
122)
123
124case class AggregateMetrics(
125 timestamp: Long,
126 totalUsers: Long,
127 totalTransactions: Long,
128 totalAmount: Double,
129 avgTransactionSize: Double
130)
1object StreamingPerformanceOptimizations {
2
3 def createOptimizedStreamingContext(): StreamingContext = {
4 val spark = SparkSession.builder()
5 .appName("HighThroughputStreaming")
6 // Memory optimization
7 .config("spark.sql.streaming.metricsEnabled", "true")
8 .config("spark.streaming.backpressure.enabled", "true")
9 .config("spark.streaming.backpressure.initialRate", "10000")
10 .config("spark.streaming.receiver.maxRate", "50000")
11 .config("spark.streaming.kafka.maxRatePerPartition", "5000")
12
13 // Checkpoint optimization
14 .config("spark.streaming.checkpointDir.cleaner.enabled", "true")
15 .config("spark.streaming.checkpointDir.cleaner.intervalSecs", "300")
16
17 // Serialization optimization
18 .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
19 .config("spark.kryo.registrationRequired", "false")
20
21 // Resource optimization
22 .config("spark.executor.cores", "4")
23 .config("spark.executor.memory", "8g")
24 .config("spark.executor.memoryFraction", "0.8")
25 .config("spark.streaming.unpersist", "true")
26
27 // Network optimization
28 .config("spark.network.timeout", "600s")
29 .config("spark.streaming.kafka.consumer.poll.ms", "1000")
30
31 .getOrCreate()
32
33 val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
34 ssc.checkpoint("hdfs://namenode:9000/streaming/optimized-checkpoints")
35 ssc
36 }
37
38 def optimizeStreamProcessing[T](dstream: DStream[T]): DStream[T] = {
39 dstream
40 .persist(StorageLevel.MEMORY_AND_DISK_SER_2) // Efficient serialized storage
41 .transform { rdd =>
42 // Coalesce small partitions to reduce task overhead
43 val optimalPartitions = Math.max(rdd.sparkContext.defaultParallelism,
44 rdd.partitions.length / 2)
45 rdd.coalesce(optimalPartitions)
46 }
47 }
48
49 def implementBackpressureControl(ssc: StreamingContext): Unit = {
50 // Custom rate controller for dynamic backpressure
51 val rateController = new PIDRateController()
52
53 ssc.addStreamingListener(new StreamingListener {
54 override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
55 val processingDelay = batchCompleted.batchInfo.processingDelay.getOrElse(0L)
56 val batchDuration = ssc.graph.batchDuration.milliseconds
57
58 // Adjust rate based on processing performance
59 if (processingDelay > batchDuration * 0.8) {
60 rateController.decreaseRate()
61 } else if (processingDelay < batchDuration * 0.3) {
62 rateController.increaseRate()
63 }
64 }
65 })
66 }
67}
68
69class PIDRateController(
70 var currentRate: Double = 10000.0,
71 kp: Double = 0.1,
72 ki: Double = 0.01,
73 kd: Double = 0.05
74) {
75 private var previousError: Double = 0.0
76 private var integral: Double = 0.0
77
78 def adjustRate(targetLatency: Double, actualLatency: Double): Double = {
79 val error = targetLatency - actualLatency
80 integral += error
81 val derivative = error - previousError
82
83 val adjustment = kp * error + ki * integral + kd * derivative
84 currentRate = Math.max(100, Math.min(50000, currentRate + adjustment))
85
86 previousError = error
87 currentRate
88 }
89
90 def increaseRate(): Unit = currentRate = Math.min(50000, currentRate * 1.1)
91 def decreaseRate(): Unit = currentRate = Math.max(100, currentRate * 0.9)
92}
1object FaultTolerancePatterns {
2
3 def implementRobustCheckpointing(ssc: StreamingContext): Unit = {
4 // Multi-level checkpointing for maximum reliability
5 val primaryCheckpointDir = "hdfs://namenode:9000/streaming/checkpoints/primary"
6 val secondaryCheckpointDir = "s3a://backup-bucket/streaming/checkpoints/secondary"
7
8 ssc.checkpoint(primaryCheckpointDir)
9
10 // Custom checkpoint management
11 ssc.addStreamingListener(new StreamingListener {
12 override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
13 // Periodically backup checkpoints to secondary location
14 if (batchCompleted.batchInfo.batchTime.milliseconds % 300000 == 0) {
15 backupCheckpointToSecondary(primaryCheckpointDir, secondaryCheckpointDir)
16 }
17
18 // Clean up old checkpoint data
19 cleanupOldCheckpoints(primaryCheckpointDir, hoursToKeep = 24)
20 }
21 })
22 }
23
24 def createFailsafeStreamingApp(): StreamingContext = {
25 def createStreamingContext(): StreamingContext = {
26 val ssc = new StreamingContext(SparkContext.getOrCreate(), Seconds(10))
27
28 // Setup your streaming logic here
29 val stream = createInputStream(ssc)
30 val processedStream = processStream(stream)
31 persistResults(processedStream)
32
33 ssc
34 }
35
36 // Recover from checkpoint or create new context
37 val checkpointDir = "hdfs://namenode:9000/streaming/checkpoints"
38 val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext _)
39
40 // Add custom exception handling
41 ssc.sparkContext.setLocalProperty("spark.streaming.stopGracefullyOnShutdown", "true")
42
43 ssc
44 }
45
46 def implementExactlyOnceProcessing[T](
47 stream: DStream[T],
48 processFunction: T => Unit,
49 outputLocation: String
50 ): Unit = {
51
52 stream.foreachRDD { (rdd, time) =>
53 val batchId = s"batch_${time.milliseconds}"
54
55 // Check if batch already processed (idempotency)
56 if (!isBatchProcessed(batchId)) {
57 // Process the batch
58 rdd.foreachPartition { partition =>
59 val connection = createTransactionalConnection()
60 try {
61 connection.beginTransaction()
62
63 partition.foreach(processFunction)
64
65 // Mark batch as processed
66 markBatchAsProcessed(batchId, connection)
67 connection.commit()
68
69 } catch {
70 case ex: Exception =>
71 connection.rollback()
72 throw ex
73 } finally {
74 connection.close()
75 }
76 }
77 } else {
78 println(s"Batch $batchId already processed, skipping...")
79 }
80 }
81 }
82}
1import org.apache.spark.streaming.scheduler._
2import io.prometheus.client._
3import java.util.concurrent.atomic.AtomicLong
4
5object StreamingMonitoring {
6
7 // Prometheus metrics
8 val batchProcessingTime = Gauge.build()
9 .name("spark_streaming_batch_processing_seconds")
10 .help("Time taken to process each batch")
11 .register()
12
13 val recordsProcessed = Counter.build()
14 .name("spark_streaming_records_processed_total")
15 .help("Total number of records processed")
16 .register()
17
18 val failedBatches = Counter.build()
19 .name("spark_streaming_failed_batches_total")
20 .help("Total number of failed batches")
21 .register()
22
23 val queuedBatches = Gauge.build()
24 .name("spark_streaming_queued_batches")
25 .help("Number of batches waiting to be processed")
26 .register()
27
28 def setupMonitoring(ssc: StreamingContext): Unit = {
29 ssc.addStreamingListener(new StreamingListener {
30
31 override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
32 val queueDelay = batchStarted.batchInfo.processingStartTime.map(
33 _ - batchStarted.batchInfo.batchTime.milliseconds
34 ).getOrElse(0L)
35
36 queuedBatches.set(queueDelay / 1000.0)
37
38 logBatchInfo("STARTED", batchStarted.batchInfo)
39 }
40
41 override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
42 val batchInfo = batchCompleted.batchInfo
43 val processingTime = batchInfo.processingDelay.getOrElse(0L)
44 val numRecords = batchInfo.numRecords
45
46 batchProcessingTime.set(processingTime / 1000.0)
47 recordsProcessed.inc(numRecords)
48
49 // Check for performance degradation
50 val batchDuration = ssc.graph.batchDuration.milliseconds
51 if (processingTime > batchDuration * 1.5) {
52 alertSlowProcessing(batchInfo, processingTime, batchDuration)
53 }
54
55 logBatchInfo("COMPLETED", batchInfo)
56 }
57
58 override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
59 logBatchInfo("SUBMITTED", batchSubmitted.batchInfo)
60 }
61
62 override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
63 failedBatches.inc()
64 logError(s"Receiver error: ${receiverError.receiverInfo.name}",
65 receiverError.receiverError)
66
67 // Send alert for receiver failures
68 sendReceiverAlert(receiverError)
69 }
70 })
71
72 // Start metrics server
73 startMetricsServer(8080)
74 }
75
76 def logBatchInfo(status: String, batchInfo: BatchInfo): Unit = {
77 val processingDelay = batchInfo.processingDelay.getOrElse(0L)
78 val schedulingDelay = batchInfo.schedulingDelay.getOrElse(0L)
79 val totalDelay = batchInfo.totalDelay.getOrElse(0L)
80
81 println(s"""
82 |Batch $status:
83 | Time: ${batchInfo.batchTime}
84 | Records: ${batchInfo.numRecords}
85 | Processing Delay: ${processingDelay}ms
86 | Scheduling Delay: ${schedulingDelay}ms
87 | Total Delay: ${totalDelay}ms
88 |""".stripMargin)
89 }
90
91 def createHealthcheckEndpoint(ssc: StreamingContext): Unit = {
92 // Simple HTTP endpoint for health checks
93 val healthStatus = new AtomicLong(System.currentTimeMillis())
94
95 ssc.addStreamingListener(new StreamingListener {
96 override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
97 healthStatus.set(System.currentTimeMillis())
98 }
99 })
100
101 // In production, implement proper HTTP server (Akka HTTP, etc.)
102 // For now, write status to file system
103 ssc.sparkContext.parallelize(Seq(1), 1).foreachRDD { _ =>
104 val statusFile = "/tmp/streaming_health_status"
105 val currentTime = System.currentTimeMillis()
106 val lastUpdate = healthStatus.get()
107
108 val status = if (currentTime - lastUpdate < 60000) "HEALTHY" else "UNHEALTHY"
109 // Write status to file
110 scala.tools.nsc.io.File(statusFile).writeAll(
111 s"""{"status": "$status", "last_update": $lastUpdate, "current_time": $currentTime}"""
112 )
113 }
114 }
115}
1object FinancialTransactionStreaming {
2
3 def setupFraudDetectionPipeline(ssc: StreamingContext): Unit = {
4 val transactionStream = createKafkaTransactionStream(ssc)
5
6 // Real-time fraud detection
7 val fraudDetection = transactionStream
8 .map(parseTransaction)
9 .filter(_.amount > 0) // Valid transactions only
10 .transform(rdd => enrichWithUserProfile(rdd))
11 .transform(rdd => enrichWithMerchantData(rdd))
12 .map(applyFraudRules)
13 .filter(_.riskScore > 0.7) // High-risk transactions only
14
15 // Store suspicious transactions
16 fraudDetection.foreachRDD { rdd =>
17 if (!rdd.isEmpty()) {
18 // Store in real-time alert system
19 rdd.foreachPartition { partition =>
20 val alertService = createAlertService()
21 partition.foreach { transaction =>
22 alertService.sendFraudAlert(transaction)
23 }
24 }
25
26 // Store for investigation
27 rdd.saveAsTextFile(s"hdfs://namenode:9000/fraud-alerts/${System.currentTimeMillis()}")
28 }
29 }
30
31 // Real-time metrics
32 transactionStream
33 .map(t => (t.merchantCategory, t.amount))
34 .reduceByKeyAndWindow(_ + _, _ - _, Seconds(300), Seconds(60))
35 .foreachRDD(updateTransactionMetrics)
36 }
37
38 def applyFraudRules(transaction: EnrichedTransaction): RiskTransaction = {
39 var riskScore = 0.0
40 val riskFactors = scala.collection.mutable.ListBuffer[String]()
41
42 // Rule 1: High amount compared to user's average
43 if (transaction.amount > transaction.userProfile.avgTransactionAmount * 5) {
44 riskScore += 0.3
45 riskFactors += "high_amount_vs_average"
46 }
47
48 // Rule 2: Unusual location
49 if (!transaction.userProfile.frequentLocations.contains(transaction.location)) {
50 riskScore += 0.2
51 riskFactors += "unusual_location"
52 }
53
54 // Rule 3: Velocity check
55 if (transaction.userProfile.transactionsLast24h > 20) {
56 riskScore += 0.3
57 riskFactors += "high_velocity"
58 }
59
60 // Rule 4: Merchant risk
61 if (transaction.merchantData.riskCategory == "HIGH") {
62 riskScore += 0.4
63 riskFactors += "high_risk_merchant"
64 }
65
66 RiskTransaction(transaction, riskScore, riskFactors.toList)
67 }
68}
69
70case class Transaction(
71 id: String,
72 userId: String,
73 merchantId: String,
74 amount: Double,
75 location: String,
76 timestamp: Long
77)
78
79case class UserProfile(
80 avgTransactionAmount: Double,
81 frequentLocations: Set[String],
82 transactionsLast24h: Int
83)
84
85case class MerchantData(
86 name: String,
87 category: String,
88 riskCategory: String
89)
90
91case class EnrichedTransaction(
92 transaction: Transaction,
93 userProfile: UserProfile,
94 merchantData: MerchantData
95) {
96 def amount: Double = transaction.amount
97 def location: String = transaction.location
98 def merchantCategory: String = merchantData.category
99}
100
101case class RiskTransaction(
102 enrichedTransaction: EnrichedTransaction,
103 riskScore: Double,
104 riskFactors: List[String]
105)
1object IoTSensorStreaming {
2
3 def setupIoTAnalyticsPipeline(ssc: StreamingContext): Unit = {
4 val sensorStream = createMQTTSensorStream(ssc)
5
6 // Process different sensor types
7 val temperatureAlerts = sensorStream
8 .filter(_.sensorType == "temperature")
9 .map(parseTemperatureReading)
10 .window(Minutes(5), Minutes(1))
11 .filter(reading => reading.value > 85.0 || reading.value < -10.0)
12 .map(createTemperatureAlert)
13
14 val vibrationAnalysis = sensorStream
15 .filter(_.sensorType == "vibration")
16 .map(parseVibrationReading)
17 .transform(rdd => detectVibrationAnomalies(rdd))
18
19 val energyUsageAnalytics = sensorStream
20 .filter(_.sensorType == "energy")
21 .map(parseEnergyReading)
22 .window(Hours(1), Minutes(15))
23 .transform(rdd => calculateEnergyEfficiencyMetrics(rdd))
24
25 // Combine all analytics
26 val allAlerts = temperatureAlerts.union(vibrationAnalysis).union(energyUsageAnalytics)
27
28 // Store processed data
29 allAlerts.foreachRDD { rdd =>
30 if (!rdd.isEmpty()) {
31 // Send to real-time dashboard
32 updateIoTDashboard(rdd)
33
34 // Store for historical analysis
35 rdd.saveAsTextFile(s"hdfs://namenode:9000/iot-analytics/${System.currentTimeMillis()}")
36
37 // Trigger maintenance alerts if needed
38 rdd.filter(_.alertLevel == "CRITICAL")
39 .foreachPartition(sendMaintenanceAlerts)
40 }
41 }
42 }
43}
This comprehensive guide provides production-ready patterns and implementations for Apache Spark Streaming applications. The examples demonstrate real-world scenarios including fraud detection, IoT analytics, and robust monitoring frameworks essential for mission-critical streaming applications.
For more advanced Spark techniques, explore our related guides on PySpark performance optimization and Apache Spark application development.