Apache Spark Performance Tuning Tutorial: Complete Guide with PySpark Examples
Performance tuning is an important aspect of working with Apache Spark, as it can help ensure that your data processing …
By Prabeesh Keezhathra
- 10 minutes read - 1969 wordsMQTT (Message Queuing Telemetry Transport) has become the backbone of IoT communication, enabling lightweight, reliable messaging between devices and applications. In this comprehensive tutorial, we’ll build a complete MQTT publisher-subscriber system in Scala using the Eclipse Paho library, perfect for IoT data streaming and distributed messaging architectures.
MQTT is a publish-subscribe based lightweight messaging protocol designed specifically for resource-constrained environments and unreliable networks. Originally developed by IBM for oil pipeline monitoring, MQTT has evolved into the de facto standard for IoT communication due to its minimal overhead and robust delivery guarantees.
In an MQTT system, publishers send messages to specific topics through a central broker, which then routes these messages to all subscribers listening on those topics. This pattern enables highly scalable, loosely-coupled distributed systems.
Mosquitto is a robust, open-source MQTT broker written in C, designed for efficiency and reliability in production environments. It supports all MQTT protocol features including:
Installation on Ubuntu/Debian:
1# Install Mosquitto broker and clients
2sudo apt-get update
3sudo apt-get install mosquitto mosquitto-clients
4
5# Start Mosquitto service
6sudo systemctl start mosquitto
7sudo systemctl enable mosquitto
8
9# Test installation
10mosquitto_pub -t test/topic -m "Hello MQTT"
11mosquitto_sub -t test/topic
Basic Mosquitto Configuration (/etc/mosquitto/mosquitto.conf):
# Basic configuration
port 1883
allow_anonymous true
# Logging
log_type all
log_dest file /var/log/mosquitto/mosquitto.log
# Persistence
persistence true
persistence_location /var/lib/mosquitto/
# Security (for production)
# password_file /etc/mosquitto/passwd
# acl_file /etc/mosquitto/acl
Eclipse Paho provides enterprise-grade MQTT client implementations across multiple programming languages, maintained by the Eclipse Foundation community. The Scala/Java implementation offers:
The Eclipse Paho client libraries are production-tested, widely adopted, and actively maintained, making them an excellent choice for enterprise IoT applications.
Our complete MQTT implementation is available in the GitHub repository. Let’s walk through building a robust publisher-subscriber system.
Enhanced build.sbt configuration:
1// build.sbt
2name := "MQTTScalaClient"
3version := "0.3.0"
4scalaVersion := "2.12.17"
5
6// Core MQTT dependency
7libraryDependencies ++= Seq(
8 "org.eclipse.paho" % "org.eclipse.paho.client.mqttv3" % "1.2.5",
9 "com.typesafe" % "config" % "1.4.2",
10 "ch.qos.logback" % "logback-classic" % "1.2.12",
11 "org.scalatest" %% "scalatest" % "3.2.15" % Test
12)
13
14// Eclipse repository for Paho releases
15resolvers += "Eclipse Paho Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/"
16
17// Compiler options for better code quality
18scalacOptions ++= Seq(
19 "-deprecation",
20 "-feature",
21 "-unchecked",
22 "-Xfatal-warnings"
23)
This enhanced build configuration includes logging, configuration management, and testing frameworks essential for production applications.
1// Publisher.scala
2package main.scala
3
4import org.eclipse.paho.client.mqttv3._
5import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
6import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
7import scala.util.{Try, Success, Failure}
8
9/**
10 * Production-ready MQTT publisher with error handling and reconnection logic
11 * @author Prabeesh Keezhathra
12 * @mail prabsmails@gmail.com
13 */
14object Publisher extends App {
15
16 // Configuration constants
17 private val BROKER_URL = "tcp://localhost:1883"
18 private val TOPIC = "sensors/temperature"
19 private val QOS = 2 // Exactly once delivery
20 private val CLIENT_ID = s"scala-publisher-${System.currentTimeMillis()}"
21
22 private val persistence = new MqttDefaultFilePersistence("/tmp/mqtt-persistence")
23 private val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(2)
24
25 @volatile private var client: MqttClient = _
26 @volatile private var connected = false
27
28 def main(args: Array[String]): Unit = {
29 Try {
30 setupMqttClient()
31 startPublishing()
32
33 // Keep application running
34 println("Publisher started. Press Enter to stop...")
35 scala.io.StdIn.readLine()
36
37 } match {
38 case Success(_) => println("Publisher completed successfully")
39 case Failure(exception) =>
40 println(s"Publisher failed: ${exception.getMessage}")
41 exception.printStackTrace()
42 } finally {
43 cleanup()
44 }
45 }
46
47 private def setupMqttClient(): Unit = {
48 client = new MqttClient(BROKER_URL, CLIENT_ID, persistence)
49
50 // Configure connection options
51 val connOpts = new MqttConnectOptions()
52 connOpts.setCleanSession(false) // Maintain session state
53 connOpts.setKeepAliveInterval(30)
54 connOpts.setConnectionTimeout(10)
55 connOpts.setAutomaticReconnect(true)
56
57 // Set up connection callback
58 client.setCallback(new MqttCallback {
59 override def messageArrived(topic: String, message: MqttMessage): Unit = {
60 // Publisher typically doesn't receive messages
61 }
62
63 override def connectionLost(cause: Throwable): Unit = {
64 connected = false
65 println(s"Connection lost: ${cause.getMessage}")
66 println("Attempting to reconnect...")
67 }
68
69 override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
70 println(s"Message delivery completed: ${token.getMessageId}")
71 }
72 })
73
74 // Connect to broker
75 println(s"Connecting to broker: $BROKER_URL")
76 client.connect(connOpts)
77 connected = true
78 println("Successfully connected to MQTT broker")
79 }
80
81 private def startPublishing(): Unit = {
82 scheduler.scheduleAtFixedRate(
83 () => publishSensorData(),
84 0, // Initial delay
85 5, // Period
86 TimeUnit.SECONDS
87 )
88 }
89
90 private def publishSensorData(): Unit = {
91 if (connected && client.isConnected) {
92 Try {
93 // Simulate sensor data
94 val temperature = 20.0 + scala.util.Random.nextGaussian() * 5.0
95 val timestamp = System.currentTimeMillis()
96 val payload = s"""{"temperature": $temperature, "timestamp": $timestamp, "sensor_id": "temp_001"}"""
97
98 val message = new MqttMessage(payload.getBytes("UTF-8"))
99 message.setQos(QOS)
100 message.setRetained(false)
101
102 client.publish(TOPIC, message)
103 println(s"Published: $payload")
104
105 } match {
106 case Success(_) => // Message published successfully
107 case Failure(exception) =>
108 println(s"Failed to publish message: ${exception.getMessage}")
109 }
110 } else {
111 println("Client not connected, skipping publish...")
112 }
113 }
114
115 private def cleanup(): Unit = {
116 scheduler.shutdown()
117 if (client != null && client.isConnected) {
118 client.disconnect()
119 client.close()
120 }
121 println("Publisher resources cleaned up")
122 }
123}
1// Subscriber.scala
2package main.scala
3
4import org.eclipse.paho.client.mqttv3._
5import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
6import scala.util.{Try, Success, Failure}
7import scala.concurrent.{Future, ExecutionContext}
8import java.util.concurrent.Executors
9
10/**
11 * Production-ready MQTT subscriber with robust error handling
12 * @author Prabeesh Keezhathra
13 * @mail prabsmails@gmail.com
14 */
15object Subscriber extends App {
16
17 private val BROKER_URL = "tcp://localhost:1883"
18 private val TOPIC_PATTERN = "sensors/+" // Subscribe to all sensor topics
19 private val QOS = 2
20 private val CLIENT_ID = s"scala-subscriber-${System.currentTimeMillis()}"
21
22 private val persistence = new MemoryPersistence()
23 private implicit val executionContext: ExecutionContext =
24 ExecutionContext.fromExecutor(Executors.newFixedThreadPool(4))
25
26 def main(args: Array[String]): Unit = {
27 Try {
28 val client = setupMqttClient()
29 println("Subscriber started. Press Enter to stop...")
30 scala.io.StdIn.readLine()
31
32 client.disconnect()
33 client.close()
34
35 } match {
36 case Success(_) => println("Subscriber completed successfully")
37 case Failure(exception) =>
38 println(s"Subscriber failed: ${exception.getMessage}")
39 exception.printStackTrace()
40 }
41 }
42
43 private def setupMqttClient(): MqttClient = {
44 val client = new MqttClient(BROKER_URL, CLIENT_ID, persistence)
45
46 // Configure connection options
47 val connOpts = new MqttConnectOptions()
48 connOpts.setCleanSession(false)
49 connOpts.setKeepAliveInterval(30)
50 connOpts.setAutomaticReconnect(true)
51
52 // Set up comprehensive callback handling
53 client.setCallback(new MqttCallback {
54 override def messageArrived(topic: String, message: MqttMessage): Unit = {
55 // Process messages asynchronously to avoid blocking
56 Future {
57 processMessage(topic, message)
58 }.recover {
59 case exception =>
60 println(s"Error processing message from topic $topic: ${exception.getMessage}")
61 }
62 }
63
64 override def connectionLost(cause: Throwable): Unit = {
65 println(s"Connection lost: ${cause.getMessage}")
66 println("Automatic reconnection will be attempted...")
67 }
68
69 override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
70 // Not typically used in subscriber
71 }
72 })
73
74 // Connect and subscribe
75 println(s"Connecting to broker: $BROKER_URL")
76 client.connect(connOpts)
77
78 client.subscribe(TOPIC_PATTERN, QOS)
79 println(s"Subscribed to topic pattern: $TOPIC_PATTERN")
80
81 client
82 }
83
84 private def processMessage(topic: String, message: MqttMessage): Unit = {
85 Try {
86 val payload = new String(message.getPayload, "UTF-8")
87 val qos = message.getQos
88 val retained = message.isRetained
89
90 println(s"📨 Message received:")
91 println(s" Topic: $topic")
92 println(s" QoS: $qos")
93 println(s" Retained: $retained")
94 println(s" Payload: $payload")
95 println(s" Timestamp: ${java.time.Instant.now()}")
96 println("─" * 50)
97
98 // Add your message processing logic here
99 processBusinessLogic(topic, payload)
100
101 } match {
102 case Success(_) => // Message processed successfully
103 case Failure(exception) =>
104 println(s"Failed to process message: ${exception.getMessage}")
105 }
106 }
107
108 private def processBusinessLogic(topic: String, payload: String): Unit = {
109 // Example: Parse JSON, store in database, trigger alerts, etc.
110 topic match {
111 case t if t.startsWith("sensors/temperature") =>
112 println("🌡️ Processing temperature data...")
113 // Parse temperature JSON, check thresholds, etc.
114
115 case t if t.startsWith("sensors/humidity") =>
116 println("💧 Processing humidity data...")
117
118 case _ =>
119 println("🔄 Processing generic sensor data...")
120 }
121 }
122}
Choose the appropriate QoS level based on your reliability requirements:
1// QoS 0: At most once (Fire and forget)
2message.setQos(0) // Fastest, no guarantees
3
4// QoS 1: At least once (Acknowledged delivery)
5message.setQos(1) // Guaranteed delivery, possible duplicates
6
7// QoS 2: Exactly once (Assured delivery)
8message.setQos(2) // Guaranteed exactly once, highest overhead
Design hierarchical topics for scalable message routing:
1// Good topic hierarchy
2"building/floor1/room101/temperature"
3"building/floor1/room101/humidity"
4"building/floor2/room201/temperature"
5
6// Subscription patterns
7client.subscribe("building/+/+/temperature", 1) // All temperatures
8client.subscribe("building/floor1/+/+", 1) // All floor 1 sensors
9client.subscribe("building/floor1/room101/+", 1) // All room 101 sensors
Implement robust connection handling for production systems:
1private def createResilientConnection(): MqttClient = {
2 val connOpts = new MqttConnectOptions()
3 connOpts.setAutomaticReconnect(true)
4 connOpts.setCleanSession(false)
5 connOpts.setKeepAliveInterval(30)
6 connOpts.setMaxInflight(1000)
7
8 // Last Will and Testament
9 connOpts.setWill(
10 "clients/disconnect",
11 s"Client $CLIENT_ID disconnected unexpectedly".getBytes(),
12 1, // QoS
13 true // Retained
14 )
15
16 client.connect(connOpts)
17 client
18}
For production deployments, implement proper security:
1// SSL/TLS configuration
2val connOpts = new MqttConnectOptions()
3connOpts.setSocketFactory(SSLSocketFactory.getDefault())
4
5// Username/password authentication
6connOpts.setUserName("your-username")
7connOpts.setPassword("your-password".toCharArray)
Configure clients for optimal performance:
1// Connection optimization
2connOpts.setMaxInflight(1000) // Maximum unacknowledged messages
3connOpts.setConnectionTimeout(30) // Connection timeout
4connOpts.setKeepAliveInterval(60) // Heartbeat interval
5
6// Message optimization
7message.setRetained(false) // Don't retain unless necessary
8message.setQos(1) // Use QoS 1 for most cases
Implement comprehensive logging and monitoring:
1// Add structured logging
2import org.slf4j.LoggerFactory
3
4private val logger = LoggerFactory.getLogger(getClass)
5
6override def messageArrived(topic: String, message: MqttMessage): Unit = {
7 logger.info(s"Message received - Topic: $topic, Size: ${message.getPayload.length} bytes")
8 // Process message
9}
10
11override def connectionLost(cause: Throwable): Unit = {
12 logger.error(s"MQTT connection lost", cause)
13 // Implement alerting logic
14}
This MQTT foundation enables numerous IoT and messaging applications:
1// Sensor data aggregation
2sensors/+/temperature -> Data Processing -> Analytics Dashboard
3sensors/+/alerts -> Alert Processing -> Notification System
1// Event-driven architecture
2orders/created -> Order Processing Service
3orders/updated -> Inventory Service
4orders/completed -> Billing Service
1// Live data visualization
2metrics/cpu -> Dashboard
3metrics/memory -> Dashboard
4metrics/network -> Dashboard
Terminal Testing:
1# Test publisher (in one terminal)
2mosquitto_pub -t sensors/test -m "Hello from command line"
3
4# Test subscriber (in another terminal)
5mosquitto_sub -t "sensors/+" -v
Load Testing:
1# Publish multiple messages rapidly
2for i in {1..1000}; do
3 mosquitto_pub -t sensors/load-test -m "Message $i"
4done
This comprehensive MQTT tutorial provides a solid foundation for building scalable, reliable IoT messaging systems in Scala. The combination of Eclipse Paho’s robust client library with Scala’s functional programming capabilities creates an excellent platform for modern distributed applications.
For more advanced Scala tutorials and IoT development patterns, explore our related guides on Apache Spark for IoT data processing and building real-time data pipelines.