A while ago, I created an Azure Virtual WAN (Standard) and added a virtual hub. For some reason, the virtual hub ended up in the state below:
Hub and routing status: Failed (Ouch!)
I tried to reset the router and virtual hub but to no avail. Next, I tried to delete the hub. In the portal, this resulted in a validating state that did not end. In the Azure CLI, an error was thrown.
Because this is a Microsoft Partner Network (MPN) subscription, I also did not have technical support or an easy way to enable it. I ended up buying Developer Support for a month just to open a service request.
The (helpful) support engineer asked me to do the following:
Navigate to subscriptions and the resource group that contains the hub
Under providers, navigate to Microsoft.Network
Locate the virtual hub and do GET, EDIT, PUT (set Read/Write mode first)
After clicking GET and EDIT, PUT can be clicked
At first it did not seem to work but in my case, the PUT operation just took a very long time. After the PUT operation finished, I could delete the virtual hub from the portal.
Long story short: if you ever have a resource you cannot delete, give Azure Resource Explorer and the above procedure a try. Your mileage may vary though!
In a previous post, we looked at using the Dapr InfluxDB component to write data to InfluxDB Cloud. In this post, we will take a look at reading data from an MQTT topic and storing it in InfluxDB. We will use Dapr 0.10, which includes both components.
To get up to speed with Dapr, please read the previous post and make sure you have an InfluxDB instance up and running in the cloud.
If you want to see a video instead:
MQTT to Influx with Dapr
Note that the video sends output to both InfluxDB and Azure SignalR. In addition, the video uses Dapr 0.8 with a custom compiled Dapr because I was still developing and testing the InfluxDB component.
MQTT Server
Although there are cloud-based MQTT servers you can use, let’s mix it up a little and run the MQTT server from Docker. If you have Docker installed, type the following:
docker run -it -p 1883:1883 -p 9001:9001 eclipse-mosquitto
The above command runs Mosquitto and exposes port 1883 on your local machine. You can use a tool such as MQTT Explorer to send data. Install MQTT Explorer on your local machine and run it. Create a connection like in the below screenshot:
MQTT Explorer connection
Now, click Connect to connect to Mosquitto. With MQTT, you send data to topics of your choice. Publish a json message to a topic called test as shown below:
Publish json data to the test topic
You can now click the topic in the list of topics and see its most recent value:
Subscribing to the test topic
Using MQTT with Dapr
You are now ready to read data from an MQTT topic with Dapr. If you have Dapr installed, you can run the following code to read from the test topic and store the data in InfluxDB:
const express = require('express');
const bodyParser = require('body-parser');
const app = express();
app.use(bodyParser.json());
const port = 3000;
// mqtt component will post messages from influx topic here
app.post('/mqtt', (req, res) => {
console.log("MQTT Binding Trigger");
console.log(req.body)
// body is expected to contain room and temperature
room = req.body.room
temperature = req.body.temperature
// room should not contain spaces
room = room.split(" ").join("_")
// create message for influx component
message = {
"measurement": "stat",
"tags": `room=${room}`,
"values": `temperature=${temperature}`
};
// send the message to influx output binding
res.send({
"to": ["influx"],
"data": message
});
});
app.listen(port, () => console.log(`Node App listening on port ${port}!`));
In this example, we use Node.js instead of Python to illustrate that Dapr works with any language. You will also need this package.json and run npm install:
In the previous post about InfluxDB, we used an output binding. You use an output binding by posting data to a Dapr HTTP URI.
To use an input binding like MQTT, you will need to create an HTTP server. Above, we create an HTTP server with Express, and listen on port 3000 for incoming requests. Later, we will instruct Dapr to listen for messages on an MQTT topic and, when a message arrives, post it to our server. We can then retrieve the message from the request body.
To tell Dapr what to do, we’ll create a components folder in the same folder that holds the Node.js code. Put a file in that folder with the following contents:
Above, we configure the MQTT component to list to topic test on mqtt://localhost:1883. The name we use (in metadata) is important because that needs to correspond to our HTTP handler (/mqtt).
Like in the previous post, there’s another file that configures the InfluxDB component:
Replace the parameters in the file above with your own.
Saving the MQTT request body to InfluxDB
If you look at the Node.js code, you have probably noticed that we send a response body in the /mqtt handler:
res.send({
"to": ["influx"],
"data": message
});
Dapr is written to accept responses that include a to and a data field in the JSON response. The above response simply tells Dapr to send the message in the data field to the configured influx component.
Does it work?
Let’s run the code with Dapr to see if it works:
dapr run --app-id mqqtinflux --app-port 3000 --components-path=./components node app.js
In dapr run, we also need to specify the port our app uses. Remember that Dapr will post JSON data to our /mqtt handler!
Let’s post some JSON with the expected fields of temperature and room to our MQTT server:
Posting data to the test topic
The Dapr logs show the following:
Logs from the APP (appear alongside the Dapr logs)
In InfluxDB Cloud table view:
Data stored in InfluxDB Cloud (posted some other data points before)
Conclusion
Dapr makes it really easy to retrieve data with input bindings and send that data somewhere else with output bindings. There are many other input and output bindings so make sure you check them out on GitHub!
A while ago, I created a component that can write to InfluxDB 2.0 from Dapr. This component is now included in the 0.10 release. In this post, we will briefly look at how you can use it.
If you do not know what Dapr is, take a look at https://dapr.io. I also have some videos on Youtube about Dapr. And be sure to check out the video below as well:
Let’s jump in and use the component.
Installing Dapr
You can install Dapr on Windows, Mac and Linux by following the instructions on https://dapr.io/. Just click the Download link and select your operating system. I installed Dapr on WSL 2 (Windows Subsystem for Linux) on Windows 10 with the following command:
The above command just installs the Dapr CLI. To initialize Dapr, you need to run dapr init.
Getting an InfluxDB database
InfluxDB is a time-series database. You can easily run it in a container on your local machine but you can also use InfluxDB Cloud. In this post, we will simply use a free cloud instance. Just head over to https://cloud2.influxdata.com/signup and signup for an account. Just follow the steps and use a free account. It stores data for maximum 30 days and has some other limits as well.
You will need the following information to write data to InfluxDB:
Organization: this will be set to the e-mail account you signed up with; it can be renamed if you wish
Bucket: your data is stored in a bucket; by default you get a bucket called e-mail-prefix’s Bucket (e.g. geert.baeke’s Bucket)
Token: you need a token that provides the necessary access rights such as read and/or write
Let’s rename the bucket to get a feel for the user interface. Click Data, Buckets and then Settings as shown below:
Getting to the bucket settings
Click Rename and follow the steps to rename the bucket:
Renaming the bucket
Now, let’s create a token. In the Load Data screen, click Tokens. Click Generate and then click Read/Write Token. Describe the token and create it like below:
Creating a token
Now click the token you created and copy it to the clipboard. You now have the organization name, a bucket name and a token. You still need a URL to connect to but that just the URL you see in the browser (the yellow part):
URL to send your data
Your URL will depend on the cloud you use.
Python code to write to InfluxDB with Dapr
The code below requires Python 3. I used version 3.6.9 but it will work with more recent versions of course.
import time
import requests
import os
dapr_port = os.getenv("DAPR_HTTP_PORT", 3500)
dapr_url = "http://localhost:{}/v1.0/bindings/influx".format(dapr_port)
n = 0.0
while True:
n += 1.0
payload = {
"data": {
"measurement": "temp",
"tags": "room=dorm,building=building-a",
"values": "sensor=\"sensor X\",avg={},max={}".format(n, n*2)
},
"operation": "create"
}
print(payload, flush=True)
try:
response = requests.post(dapr_url, json=payload)
print(response, flush=True)
except Exception as e:
print(e, flush=True)
time.sleep(1)
The code above is just an illustration of using the InfluxDB output binding from Dapr. It is crucial to understand that a Dapr process needs to be running, either locally on your system or as a Kubernetes sidecar, that the above program communicates with. To that end, we get the Dapr port number from an environment variable or use the default port 3500.
The Python program uses the InfluxDB output binding simply by posting data to an HTTP endpoint. The endpoint is constructed as follows:
The dapr_url above is set to a URI that uses localhost over the Dapr port and then uses the influx binding by appending /v1.0/bindings/influx. All bindings have a specific name like influx, mqtt, etc… and that name is then added to /v1.0/bindings/ to make the call work.
So far so good, but how does the binding know where to connect and what organization, bucket and token to use? That’s where the component .yaml file comes in. In the same folder where you save your Python code, create a folder called components. In the folder, create a file called influx.yaml (you can give it any name you want). The influx.yaml contents is shown below:
Of course, replace the uppercase values above with your own. We will later tell Dapr to look for files like this in the components folder. Automatically, because you use the influx binding in your Python code, Dapr will go look for the file above (type: bindings.influx) and retrieve the required metadata. If any of the metadata is not set or if the file is missing or improperly formatted, you will get an error.
To actually use the binding, we need to post some data to the URI we constructed. The data we send is in the payload variable as shown below:
It requires a measurement field, a tags and a values field and uses the InfluxDB line protocol to send the data. You can find more information about that here.
The data field in the payload is specific to the Influx component. The operation field is required by this Dapr component as it is written to listen for create operations.
Running the code
On your local machine, you will need to run Dapr together with your code to make it work. You use dapr run for this. To run the Python code (saved to app.py in my case), run the command below from the folder that contains the code and the components folder:
dapr run --app-id influx -d ./components python3 app.py
This starts Dapr and our application with app id influx. With -d, we point to the components file.
When you run the code, Dapr logs and your logs will be printed to the screen. In InfluxDB Cloud, we can check the data from the user interface:
Date Explorer (Note: other organization and bucket than the one used in this post)
Conclusion
Dapr can be used in the cloud and at the edge, in containers or without. In both cases, you often have to write data to databases. With Dapr, you can now easily write data as time series to InfluxDB. Note that Dapr also has an MQTT input and output binding. Using the same simple technique you learned in this post, you can easily read data from an MQTT topic and forward it to InfluxDB. In a later post, we will take a look at that scenario as well. Or check this video instead: https://youtu.be/2vCT79KG24E. Note that the video uses a custom compiled Dapr 0.8 with the InfluxDB component because this video was created during development.