From MQTT to InfluxDB with Dapr

In a previous post, we looked at using the Dapr InfluxDB component to write data to InfluxDB Cloud. In this post, we will take a look at reading data from an MQTT topic and storing it in InfluxDB. We will use Dapr 0.10, which includes both components.

To get up to speed with Dapr, please read the previous post and make sure you have an InfluxDB instance up and running in the cloud.

If you want to see a video instead:

MQTT to Influx with Dapr

Note that the video sends output to both InfluxDB and Azure SignalR. In addition, the video uses Dapr 0.8 with a custom compiled Dapr because I was still developing and testing the InfluxDB component.

MQTT Server

Although there are cloud-based MQTT servers you can use, let’s mix it up a little and run the MQTT server from Docker. If you have Docker installed, type the following:

docker run -it -p 1883:1883 -p 9001:9001 eclipse-mosquitto

The above command runs Mosquitto and exposes port 1883 on your local machine. You can use a tool such as MQTT Explorer to send data. Install MQTT Explorer on your local machine and run it. Create a connection like in the below screenshot:

MQTT Explorer connection

Now, click Connect to connect to Mosquitto. With MQTT, you send data to topics of your choice. Publish a json message to a topic called test as shown below:

Publish json data to the test topic

You can now click the topic in the list of topics and see its most recent value:

Subscribing to the test topic

Using MQTT with Dapr

You are now ready to read data from an MQTT topic with Dapr. If you have Dapr installed, you can run the following code to read from the test topic and store the data in InfluxDB:

const express = require('express');
const bodyParser = require('body-parser');

const app = express();
app.use(bodyParser.json());

const port = 3000;

// mqtt component will post messages from influx topic here
app.post('/mqtt', (req, res) => {
    console.log("MQTT Binding Trigger");
    console.log(req.body)

    // body is expected to contain room and temperature
    room = req.body.room
    temperature = req.body.temperature

    // room should not contain spaces
    room = room.split(" ").join("_")

    // create message for influx component
    message = {
        "measurement": "stat",
        "tags": `room=${room}`,
        "values": `temperature=${temperature}`
    };
    
    // send the message to influx output binding
    res.send({
        "to": ["influx"],
        "data": message
    });
});

app.listen(port, () => console.log(`Node App listening on port ${port}!`));

In this example, we use Node.js instead of Python to illustrate that Dapr works with any language. You will also need this package.json and run npm install:

{
  "name": "mqttapp",
  "version": "1.0.0",
  "description": "",
  "main": "app.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "author": "",
  "license": "ISC",
  "dependencies": {
    "body-parser": "^1.18.3",
    "express": "^4.16.4"
  }
}

In the previous post about InfluxDB, we used an output binding. You use an output binding by posting data to a Dapr HTTP URI.

To use an input binding like MQTT, you will need to create an HTTP server. Above, we create an HTTP server with Express, and listen on port 3000 for incoming requests. Later, we will instruct Dapr to listen for messages on an MQTT topic and, when a message arrives, post it to our server. We can then retrieve the message from the request body.

To tell Dapr what to do, we’ll create a components folder in the same folder that holds the Node.js code. Put a file in that folder with the following contents:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt
spec:
  type: bindings.mqtt
  metadata:
  - name: url
    value: mqtt://localhost:1883
  - name: topic
    value: test

Above, we configure the MQTT component to list to topic test on mqtt://localhost:1883. The name we use (in metadata) is important because that needs to correspond to our HTTP handler (/mqtt).

Like in the previous post, there’s another file that configures the InfluxDB component:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: influx
spec:
  type: bindings.influx
  metadata:
  - name: Url
    value: http://localhost:9999
  - name: Token
    value: ""
  - name: Org
    value: ""
  - name: Bucket
    value: ""

Replace the parameters in the file above with your own.

Saving the MQTT request body to InfluxDB

If you look at the Node.js code, you have probably noticed that we send a response body in the /mqtt handler:

res.send({
        "to": ["influx"],
        "data": message
    });

Dapr is written to accept responses that include a to and a data field in the JSON response. The above response simply tells Dapr to send the message in the data field to the configured influx component.

Does it work?

Let’s run the code with Dapr to see if it works:

dapr run --app-id mqqtinflux --app-port 3000 --components-path=./components node app.js

In dapr run, we also need to specify the port our app uses. Remember that Dapr will post JSON data to our /mqtt handler!

Let’s post some JSON with the expected fields of temperature and room to our MQTT server:

Posting data to the test topic

The Dapr logs show the following:

Logs from the APP (appear alongside the Dapr logs)

In InfluxDB Cloud table view:

Data stored in InfluxDB Cloud (posted some other data points before)

Conclusion

Dapr makes it really easy to retrieve data with input bindings and send that data somewhere else with output bindings. There are many other input and output bindings so make sure you check them out on GitHub!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s