In a previous post, we looked at using the Dapr InfluxDB component to write data to InfluxDB Cloud. In this post, we will take a look at reading data from an MQTT topic and storing it in InfluxDB. We will use Dapr 0.10, which includes both components.
To get up to speed with Dapr, please read the previous post and make sure you have an InfluxDB instance up and running in the cloud.
If you want to see a video instead:
Note that the video sends output to both InfluxDB and Azure SignalR. In addition, the video uses Dapr 0.8 with a custom compiled Dapr because I was still developing and testing the InfluxDB component.
MQTT Server
Although there are cloud-based MQTT servers you can use, let’s mix it up a little and run the MQTT server from Docker. If you have Docker installed, type the following:
docker run -it -p 1883:1883 -p 9001:9001 eclipse-mosquitto
The above command runs Mosquitto and exposes port 1883 on your local machine. You can use a tool such as MQTT Explorer to send data. Install MQTT Explorer on your local machine and run it. Create a connection like in the below screenshot:

Now, click Connect to connect to Mosquitto. With MQTT, you send data to topics of your choice. Publish a json message to a topic called test as shown below:

You can now click the topic in the list of topics and see its most recent value:

Using MQTT with Dapr
You are now ready to read data from an MQTT topic with Dapr. If you have Dapr installed, you can run the following code to read from the test topic and store the data in InfluxDB:
const express = require('express');
const bodyParser = require('body-parser');
const app = express();
app.use(bodyParser.json());
const port = 3000;
// mqtt component will post messages from influx topic here
app.post('/mqtt', (req, res) => {
console.log("MQTT Binding Trigger");
console.log(req.body)
// body is expected to contain room and temperature
room = req.body.room
temperature = req.body.temperature
// room should not contain spaces
room = room.split(" ").join("_")
// create message for influx component
message = {
"measurement": "stat",
"tags": `room=${room}`,
"values": `temperature=${temperature}`
};
// send the message to influx output binding
res.send({
"to": ["influx"],
"data": message
});
});
app.listen(port, () => console.log(`Node App listening on port ${port}!`));
In this example, we use Node.js instead of Python to illustrate that Dapr works with any language. You will also need this package.json and run npm install:
{
"name": "mqttapp",
"version": "1.0.0",
"description": "",
"main": "app.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"body-parser": "^1.18.3",
"express": "^4.16.4"
}
}
In the previous post about InfluxDB, we used an output binding. You use an output binding by posting data to a Dapr HTTP URI.
To use an input binding like MQTT, you will need to create an HTTP server. Above, we create an HTTP server with Express, and listen on port 3000 for incoming requests. Later, we will instruct Dapr to listen for messages on an MQTT topic and, when a message arrives, post it to our server. We can then retrieve the message from the request body.
To tell Dapr what to do, we’ll create a components folder in the same folder that holds the Node.js code. Put a file in that folder with the following contents:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: mqtt
spec:
type: bindings.mqtt
metadata:
- name: url
value: mqtt://localhost:1883
- name: topic
value: test
Above, we configure the MQTT component to list to topic test on mqtt://localhost:1883. The name we use (in metadata) is important because that needs to correspond to our HTTP handler (/mqtt).
Like in the previous post, there’s another file that configures the InfluxDB component:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: influx
spec:
type: bindings.influx
metadata:
- name: Url
value: http://localhost:9999
- name: Token
value: ""
- name: Org
value: ""
- name: Bucket
value: ""
Replace the parameters in the file above with your own.
Saving the MQTT request body to InfluxDB
If you look at the Node.js code, you have probably noticed that we send a response body in the /mqtt handler:
res.send({
"to": ["influx"],
"data": message
});
Dapr is written to accept responses that include a to and a data field in the JSON response. The above response simply tells Dapr to send the message in the data field to the configured influx component.
Does it work?
Let’s run the code with Dapr to see if it works:
dapr run --app-id mqqtinflux --app-port 3000 --components-path=./components node app.js
In dapr run, we also need to specify the port our app uses. Remember that Dapr will post JSON data to our /mqtt handler!
Let’s post some JSON with the expected fields of temperature and room to our MQTT server:

The Dapr logs show the following:

In InfluxDB Cloud table view:

Conclusion
Dapr makes it really easy to retrieve data with input bindings and send that data somewhere else with output bindings. There are many other input and output bindings so make sure you check them out on GitHub!