MQTT Publisher and Subscriber in Scala with Eclipse Paho
By Prabeesh Keezhathra
- 3 minutes read - 598 wordsMQTT is a lightweight publish-subscribe protocol widely used in IoT and telemetry because it runs fine over slow, lossy networks. This post builds a minimal publisher and subscriber in Scala using the Eclipse Paho library, talking to a local Mosquitto broker. The complete code is on GitHub.
How MQTT works
MQTT uses a central broker. Publishers send messages to topics, and any subscribers listening to a matching topic pattern receive them. Topics are hierarchical strings separated by /, for example sensors/room1/temperature, and subscriptions can use + (single level) and # (multi-level) wildcards.
Three QoS levels control delivery:
| QoS | Meaning | Use when |
|---|---|---|
| 0 | At most once | Telemetry where the occasional drop is fine |
| 1 | At least once | Most sensor or command messages |
| 2 | Exactly once | When duplicates cause real problems |
Installing the Mosquitto broker
On Ubuntu or Debian:
1sudo apt-get install mosquitto mosquitto-clients
2sudo systemctl start mosquitto
Quick sanity check with the CLI tools:
1# Terminal 1
2mosquitto_sub -t test/topic
3
4# Terminal 2
5mosquitto_pub -t test/topic -m "Hello MQTT"
sbt setup
build.sbt:
1name := "MQTTScalaClient"
2version := "0.1"
3scalaVersion := "2.10.4"
4
5libraryDependencies += "org.eclipse.paho" % "org.eclipse.paho.client.mqttv3" % "1.0.2"
6
7resolvers += "Eclipse Paho Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/"
Publisher
1import org.eclipse.paho.client.mqttv3._
2import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
3
4object Publisher {
5 def main(args: Array[String]): Unit = {
6 val brokerUrl = "tcp://localhost:1883"
7 val topic = "sensors/temperature"
8 val qos = 1
9 val clientId = "scala-publisher"
10
11 val persistence = new MqttDefaultFilePersistence("/tmp/mqtt")
12 val client = new MqttClient(brokerUrl, clientId, persistence)
13
14 val opts = new MqttConnectOptions()
15 opts.setCleanSession(true)
16 client.connect(opts)
17
18 val payload = """{"temperature": 22.5}"""
19 val message = new MqttMessage(payload.getBytes("UTF-8"))
20 message.setQos(qos)
21
22 client.publish(topic, message)
23 println(s"Published to $topic: $payload")
24
25 client.disconnect()
26 }
27}
The flow: create an MqttClient, connect with an MqttConnectOptions, build an MqttMessage, call publish, and disconnect. That’s the whole publisher API surface.
Subscriber
1import org.eclipse.paho.client.mqttv3._
2import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
3
4object Subscriber {
5 def main(args: Array[String]): Unit = {
6 val brokerUrl = "tcp://localhost:1883"
7 val topic = "sensors/+" // all sensors
8 val qos = 1
9 val clientId = "scala-subscriber"
10
11 val client = new MqttClient(brokerUrl, clientId, new MemoryPersistence())
12
13 client.setCallback(new MqttCallback {
14 override def messageArrived(topic: String, message: MqttMessage): Unit = {
15 val payload = new String(message.getPayload, "UTF-8")
16 println(s"Received on $topic: $payload")
17 }
18 override def connectionLost(cause: Throwable): Unit =
19 println(s"Connection lost: ${cause.getMessage}")
20 override def deliveryComplete(token: IMqttDeliveryToken): Unit = {}
21 })
22
23 client.connect()
24 client.subscribe(topic, qos)
25 println(s"Subscribed to $topic. Press Ctrl-C to exit.")
26
27 // Keep the JVM alive so the callback can fire.
28 Thread.currentThread().join()
29 }
30}
MqttCallback.messageArrived fires on every message matching the subscription. Anything else (filtering, persistence, downstream processing) goes in there.
Running the example
Start the subscriber first so it’s ready when the publisher fires:
1sbt "runMain Subscriber"
In another terminal:
1sbt "runMain Publisher"
The subscriber prints the JSON payload. You can also confirm with the CLI tools:
1mosquitto_sub -t "sensors/+" -v
A few practical notes
- Clean session:
setCleanSession(false)lets the broker queue messages for your client while it’s offline. Pair with a stable client ID. - Keep-alive:
setKeepAliveIntervalcontrols how often the client pings the broker; lower values detect dropped connections faster but use more bandwidth. - Last Will and Testament:
opts.setWill(topic, payload, qos, retained)lets the broker publish a message on your behalf if the client disconnects uncleanly. Useful for device presence tracking. - TLS: for anything beyond a local test, use
ssl://...URLs andsetSocketFactorywith a properly configuredSSLContext.
For related Scala and data-processing content, see Apache Spark performance tuning.