From MQTT to InfluxDB with Dapr

In a previous post, we looked at using the Dapr InfluxDB component to write data to InfluxDB Cloud. In this post, we will take a look at reading data from an MQTT topic and storing it in InfluxDB. We will use Dapr 0.10, which includes both components.

To get up to speed with Dapr, please read the previous post and make sure you have an InfluxDB instance up and running in the cloud.

If you want to see a video instead:

MQTT to Influx with Dapr

Note that the video sends output to both InfluxDB and Azure SignalR. In addition, the video uses Dapr 0.8 with a custom compiled Dapr because I was still developing and testing the InfluxDB component.

MQTT Server

Although there are cloud-based MQTT servers you can use, let’s mix it up a little and run the MQTT server from Docker. If you have Docker installed, type the following:

docker run -it -p 1883:1883 -p 9001:9001 eclipse-mosquitto

The above command runs Mosquitto and exposes port 1883 on your local machine. You can use a tool such as MQTT Explorer to send data. Install MQTT Explorer on your local machine and run it. Create a connection like in the below screenshot:

MQTT Explorer connection

Now, click Connect to connect to Mosquitto. With MQTT, you send data to topics of your choice. Publish a json message to a topic called test as shown below:

Publish json data to the test topic

You can now click the topic in the list of topics and see its most recent value:

Subscribing to the test topic

Using MQTT with Dapr

You are now ready to read data from an MQTT topic with Dapr. If you have Dapr installed, you can run the following code to read from the test topic and store the data in InfluxDB:

const express = require('express');
const bodyParser = require('body-parser');

const app = express();
app.use(bodyParser.json());

const port = 3000;

// mqtt component will post messages from influx topic here
app.post('/mqtt', (req, res) => {
    console.log("MQTT Binding Trigger");
    console.log(req.body)

    // body is expected to contain room and temperature
    room = req.body.room
    temperature = req.body.temperature

    // room should not contain spaces
    room = room.split(" ").join("_")

    // create message for influx component
    message = {
        "measurement": "stat",
        "tags": `room=${room}`,
        "values": `temperature=${temperature}`
    };
    
    // send the message to influx output binding
    res.send({
        "to": ["influx"],
        "data": message
    });
});

app.listen(port, () => console.log(`Node App listening on port ${port}!`));

In this example, we use Node.js instead of Python to illustrate that Dapr works with any language. You will also need this package.json and run npm install:

{
  "name": "mqttapp",
  "version": "1.0.0",
  "description": "",
  "main": "app.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "author": "",
  "license": "ISC",
  "dependencies": {
    "body-parser": "^1.18.3",
    "express": "^4.16.4"
  }
}

In the previous post about InfluxDB, we used an output binding. You use an output binding by posting data to a Dapr HTTP URI.

To use an input binding like MQTT, you will need to create an HTTP server. Above, we create an HTTP server with Express, and listen on port 3000 for incoming requests. Later, we will instruct Dapr to listen for messages on an MQTT topic and, when a message arrives, post it to our server. We can then retrieve the message from the request body.

To tell Dapr what to do, we’ll create a components folder in the same folder that holds the Node.js code. Put a file in that folder with the following contents:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt
spec:
  type: bindings.mqtt
  metadata:
  - name: url
    value: mqtt://localhost:1883
  - name: topic
    value: test

Above, we configure the MQTT component to list to topic test on mqtt://localhost:1883. The name we use (in metadata) is important because that needs to correspond to our HTTP handler (/mqtt).

Like in the previous post, there’s another file that configures the InfluxDB component:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: influx
spec:
  type: bindings.influx
  metadata:
  - name: Url
    value: http://localhost:9999
  - name: Token
    value: ""
  - name: Org
    value: ""
  - name: Bucket
    value: ""

Replace the parameters in the file above with your own.

Saving the MQTT request body to InfluxDB

If you look at the Node.js code, you have probably noticed that we send a response body in the /mqtt handler:

res.send({
        "to": ["influx"],
        "data": message
    });

Dapr is written to accept responses that include a to and a data field in the JSON response. The above response simply tells Dapr to send the message in the data field to the configured influx component.

Does it work?

Let’s run the code with Dapr to see if it works:

dapr run --app-id mqqtinflux --app-port 3000 --components-path=./components node app.js

In dapr run, we also need to specify the port our app uses. Remember that Dapr will post JSON data to our /mqtt handler!

Let’s post some JSON with the expected fields of temperature and room to our MQTT server:

Posting data to the test topic

The Dapr logs show the following:

Logs from the APP (appear alongside the Dapr logs)

In InfluxDB Cloud table view:

Data stored in InfluxDB Cloud (posted some other data points before)

Conclusion

Dapr makes it really easy to retrieve data with input bindings and send that data somewhere else with output bindings. There are many other input and output bindings so make sure you check them out on GitHub!

Running a GoCV application in a container

In earlier posts (like here and here) I mentioned GoCV. GoCV allows you to use the popular OpenCV library from your Go programs. To avoid installing OpenCV and having to compile it from source, a container that runs your GoCV app can be beneficial. This post provides information about doing just that.

The following GitHub repository, https://github.com/denismakogon/gocv-alpine, contains all you need to get started. It’s for OpenCV 3.4.2 so you will run into issues when you want to use OpenCV 4.0. The pull request, https://github.com/denismakogon/gocv-alpine/pull/7, contains the update to 4.0 but it has not been merged yet. I used the proposed changes in the pull request to build two containers:

  • the build container: gbaeke/gocv-4.0.0-build
  • the run container: gbaeke/gocv-4.0.0-run

They are over on Docker Hub, ready for use. To actually use the above images in a typical two-step build, I used the following Dockerfile:

FROM gbaeke/gocv-4.0.0-build as build       
RUN go get -u -d gocv.io/x/gocv
RUN go get -u -d github.com/disintegration/imaging
RUN go get -u -d github.com/gbaeke/emotion
RUN cd $GOPATH/src/github.com/gbaeke/emotion && go build -o $GOPATH/bin/emo ./main.go

FROM gbaeke/gocv-4.0.0-run
COPY --from=build /go/bin/emo /emo
ADD haarcascade_frontalface_default.xml /

ENTRYPOINT ["/emo"]

The above Dockerfile uses the webcam emotion detection program from https://github.com/gbaeke/emotion. To run it on a Linux system, use the following command:

docker run -it --rm --device=/dev/video0 --env SCOREURI="YOUR-SCORE-URI" --env VIDEO=0 gbaeke/emo

The SCOREURI environment variable needs to refer to the score URI offered by the ONNX FER+ container as discussed in Detecting Emotions with FER+. With VIDEO=0 the GUI window that shows the webcam video stream is turned off (required). Detected emotions will be logged to the console.

To be able to use the actual webcam of the host, the –device flag is used to map /dev/video0 from the host to the container. That works well on a Linux host and was tested on a laptop running Ubuntu 16.04.

Recognizing images with Azure Machine Learning and the ONNX ResNet50v2 model

Featured image from: https://medium.com/comet-app/review-of-deep-learning-algorithms-for-object-detection-c1f3d437b852

In a previous post, I discussed the creation of a container image that uses the ResNet50v2 model for image classification. If you want to perform tasks such as localization or segmentation, there are other models that serve that purpose. The image was built with GPU support. Adding GPU support was pretty easy:

  • Use the enable_gpu flag in the Azure Machine Learning SDK or check the GPU box in the Azure Portal; the service will build an image that supports NVIDIA cuda
  • Add GPU support in your score.py file and/or conda dependencies file (scoring script uses the ONNX runtime, so we added the onnxruntime-gpu package)

In this post, we will deploy the image to a Kubernetes cluster with GPU nodes. We will use Azure Kubernetes Service (AKS) for this purpose. Check my previous post if you want to use NVIDIA V100 GPUs. In this post, I use hosts with one V100 GPU.

To get started, make sure you have the Kubernetes cluster deployed and that you followed the steps in my previous post to create the GPU container image. Make sure you attached the cluster to the workspace’s compute.

Deploy image to Kubernetes

Click the container image you created from the previous post and deploy it to the Kubernetes cluster you attached to the workspace by clicking + Create Deployment:

Starting the deployment from the image in the workspace

The Create Deployment screen is shown. Select AKS as deployment target and select the Kubernetes cluster you attached. Then press Create.

Azure Machine Learning now deploys the containers to Kubernetes. Note that I said containers in plural. In addition to the scoring container, another frontend container is added as well. You send your requests to the front-end container using HTTP POST. The front-end container talks to the scoring container over TCP port 5001 and passes the result back. The front-end container can be configured with certificates to support SSL.

Check the deployment and wait until it is healthy. We did not specify advanced settings during deployment so the default settings were chosen. Click the deployment to see the settings:

Deployment settings including authentication keys and scoring URI

As you can see, the deployment has authentication enabled. When you send your HTTP POST request to the scoring URI, make sure you pass an authentication header like so: bearer primary-or-secondary-key. The primary and secondary key are in the settings above. You can regenerate those keys at any time.

Checking the deployment

From the Azure Cloud Shell, issue the following commands in order to list the pods deployed to your Kubernetes cluster:

  • az aks list -o table
  • az aks get-credentials -g RESOURCEGROUP -n CLUSTERNAME
  • kubectl get pods
Listing the deployed pods

Azure Machine Learning has deployed three front-ends (default; can be changed via Advanced Settings during deployment) and one scoring container. Let’s check the container with: kubectl get pod onnxgpu-5d6c65789b-rnc56 -o yaml. Replace the container name with yours. In the output, you should find the following:

resources:
limits:
nvidia.com/gpu: "1"
requests:
cpu: 100m
memory: 500m
nvidia.com/gpu: "1"

The above allows the pod to use the GPU on the host. The nvidia drivers on the host are mapped to the pod with a volume:

volumeMounts:
- mountPath: /usr/local/nvidia
name: nvidia

Great! We did not have to bother with doing this ourselves. Let’s now try to recognize an image by sending requests to the front-end pods.

Recognizing images

To recognize an image, we need to POST a JSON payload to the scoring URI. The scoring URI can be found in the deployment properties in the workspace. In my case, the URI is:

http://23.97.218.34/api/v1/service/onnxgpu/score

The JSON payload needs to be in the below format:

{"data": [[[[143.06100463867188, 130.22100830078125, 122.31999969482422, ... ]]]]} 

The data field is a multi-dimensional array, serialized to JSON. The shape of the array is (1,3,224,224). The dimensions correspond to the batch size, channels (RGB), height and width.

You only have to read an image and put the pixel values in the array! Easy right? Well, as usual the answer is: “it depends”! The easiest way to do it, according to me, is with Python and a collection of helper packages. The code is in the following GitHub gist: https://gist.github.com/gbaeke/b25849f3813e9eb984ee691659d1d05a. You need to run the code on a machine with Python 3 installed. Make sure you also install Keras and NumPy (pip3 install keras / pip3 install numpy). The code uses two images, cat.jpg and car.jpg but you can use your own. When I run the code, I get the following result:

Using TensorFlow backend.
channels_last
Loading and preprocessing image… cat.jpg
Array shape (224, 224, 3)
Array shape afer moveaxis: (3, 224, 224)
Array shape after expand_dims (1, 3, 224, 224)
prediction time (as measured by the scoring container) 0.025304794311523438
Probably a: Egyptian_cat 0.9460222125053406
Loading and preprocessing image… car.jpg
Array shape (224, 224, 3)
Array shape afer moveaxis: (3, 224, 224)
Array shape after expand_dims (1, 3, 224, 224)
prediction time (as measured by the scoring container) 0.02526378631591797
Probably a: sports_car 0.948998749256134

It takes about 25 milliseconds to classify an image, or 40 images/second. By increasing the number of GPUs and scoring containers (we only deployed one), we can easily scale out the solution.

With a bit of help from Keras and NumPy, the code does the following:

  • check the image format reported by the keras back-end: it reports channels_last which means that, by default, the RGB channels are the last dimensions of the image array
  • load the image; the resulting array has a (224,224,3) shape
  • our container expects the channels_first format; we use moveaxis to move the last axis to the front; the array now has a (3,224,224) shape
  • our container expects a first dimension with a batch size; we use expand_dims to end up with a (1,3,224,224) shape
  • we convert the 4D array to a list and construct the JSON payload
  • we send the payload to the scoring URI and pass an authorization header
  • we get a JSON response with two fields: result and time; we print the inference time as reported by the container
  • from keras.applications.resnet50, we use the decode_predictions class to process the result field; result contains the 1000 values computed by the softmax function in the container; decode_predictions knows the categories and returns the first five
  • we print the name and probability of the category with the highest probability (item 0)

What happens when you use a scoring container that uses the CPU? In that case, you could run the container in Azure Container Instances (ACI). Using ACI is much less costly! In ACI with the default setting of 0.1 CPU, it will take around 2 seconds to score an image. Ouch! With a full CPU (in ACI), the scoring time goes down to around 180-220ms per image. To achieve better results, simply increase the number of CPUs. On the Standard_NC6s_v3 Kubernetes node with 6 cores, scoring time with CPU hovers around 60ms.

Conclusion

In this post, you have seen how Azure Machine Learning makes it straightforward to deploy GPU scoring images to a Kubernetes cluster with GPU nodes. The service automatically configures the resource requests for the GPU and maps the NVIDIA drivers to the scoring container. The only thing left to do is to start scoring images with the service. We have seen how easy that is with a bit of help from Keras and NumPy. In practice, always start with CPU scoring and scale out that solution to match your requirements. But if you do need GPUs for scoring, Azure Machine Learning makes it pretty easy to do so!

Draft: a simpler way to deploy to Kubernetes during development

If you work with containers and work with Kubernetes, Draft makes it easier to deploy your code while you are in the earlier development stages. You use Draft while you are working on your code but before you commit it to version control. The idea is simple:

  • You have some code written in something like Node.js, Go or another supported language
  • You then use draft create to containerize the application based on Draft packs; several packs come with the tool and provide a Dockerfile and a Helm chart depending on the development language
  • You then use draft up to deploy the application to Kubernetes; the application is made accessible via a public URL

Let’s demonstrate how Draft is used, based on a simple Go application that is just a bit more complex than the Go example that comes with Draft. I will use the go-data service that I blogged about earlier. You can find the source code on GitHub. The go-data service is a very simple REST API. By calling the endpoint /data/{deviceid}, it will check if a “device” exists and then actually return no data. Hey, it’s just a sample! The service uses the Gorilla router but also Go Micro to call a device service running in the Kubernetes cluster. If the device service does not run, the data service will just report that the device does not exist.

Note that this post does not cover how to install Draft and its prerequisites like Helm and a Kubernetes Ingress Controller. You will also need a Kubernetes cluster (I used Azure ACS) and a container registry (I used Docker Hub). I installed all client-side components in the Windows 10 Linux shell which works great!

The only thing you need on your development box that has Helm and Draft installed is main.go and an empty glide.yaml file. The first command to run is draft create

This results in several files and folders being created, based on the Golang Draft pack. Draft detected you used Go because of glide.yaml. No Docker container is created at this point.

  • Dockerfile: a simple Dockerfile that builds an image based on the golang:onbuild image
  • draft.toml: the Draft configuration file that contains the name of the application (set randomly), the namespace to deploy to and if the folder needs to be watched for changes after you do draft up
  • chart folder: contains the Helm chart for your application; you might need to make changes here if you want to modify the Kubernetes deployment as we will do soon

When you deploy, Draft will do several things. It will package up the chart and your code and send it to the Draft server-side component running in Kubernetes. It will then instruct Draft to build your container, push it to a configured registry and then install the application in Kubernetes. All those tasks are performed by the Draft server component, not your client!

In my case, after running draft up, I get the following on my prompt (after the build, push and deploy steps):

image

In my case, the name of the application was set to exacerbated-ragdoll (in draft.toml). Part of what makes Draft so great is that it then makes the service available using that name and the configured domain. That works because of the following:

  • During installation of Draft, you need to configure an Ingress Controller in Kubernetes; you can use a Helm chart to make that easy; the Ingress Controller does the magic of mapping the incoming request to the correct application
  • When you configure Draft for the first time with draft init you can pass the domain (in my case baeke.info); this requires a wildcard A record (e.g. *.baeke.info) that points to the public IP of the Ingress Controller; note that in my case, I used Azure Container Services which makes that IP the public IP of an Azure load balancer that load balances traffic between the Ingress Controller instances (ngnix)

So, with only my source code and a few simple commands, the application was deployed to Kubernetes and made available on the Internet! There is only one small problem here. If you check my source code, you will see that there is no route for /. The Draft pack for Golang includes a livenessProbe on / and a readinessProbe on /. The probes are in deployment.yaml which is the file that defines the Kubernetes deployment. You will need to change the path in livenessProbe and readinessProbe to point to /data/device like so:

- containerPort: {{ .Values.service.internalPort }}
livenessProbe:
  httpGet:
   path: /data/device
   port: {{ .Values.service.internalPort }}
  readinessProbe:
   httpGet:
   path: /data/device
   port: {{ .Values.service.internalPort }}

If you already deployed the application but Draft is still watching the folder, you can simply make the above changes and save the deployment.yaml file (in chart/templates). The container will then be rebuilt and the deployment will be updated. When you now check the service with curl, you should get something like:

curl http://exacerbated-ragdoll.baeke.info/data/device1

Device active:  false
Oh and, no data for you!

To actually make the Go Micro features work, we will have to make another change to deployment.yaml. We will need to add an environment variable that instructs our code to find other services developed with Go Micro using the kubernetes registry:

- name: {{ .Chart.Name }}
  image: "{{ .Values.image.registry }}/{{ .Values.image.org }}/{{ .Values.image.name }}:{{ .Values.image.tag }}"
  imagePullPolicy: {{ .Values.image.pullPolicy }}
  env:
   - name: MICRO_REGISTRY
     value: kubernetes

To actually test this, use the following command to deploy the device service.

kubectl create -f https://raw.githubusercontent.com/gbaeke/go-device/master/go-device-dep.yaml

You can then check if it works by running the curl command again. It should now return the following:

Device active:  true
Oh and, no data for you!

Hopefully, you have seen how you can work with Draft from your development box and that you can modify the files generated by Draft to control how your application gets deployed. In our case, we had to modify the health checks to make sure the service can be reached. In addition, we had to add an environment variable because the code uses the Go Micro microservices framework.

IoT Hub Scaling

When you work with Azure IoT Hub, it is not always easy to tell what will happen when you reach the limits of IoT Hub and what to do when you reach those limits. As a reminder, recall that the scale of IoT Hub is defined by its tier and the number of units in the tier. There are three paying tiers, besides the free tier:

image

Although these tiers make it clear how many messages you can send, other limits such as the amount of messages per second cannot be seen here. To have an idea about the amount of messages you can send and the sustained throughput see https://azure.microsoft.com/en-us/documentation/articles/iot-hub-scaling/#device-to-cloud-and-cloud-to-device-message-throughput

The specific burst performance numbers can be found here: https://azure.microsoft.com/en-us/documentation/articles/iot-hub-devguide-quotas-throttling/. Typically, the limit you are concerned with is the amount of device-to-cloud sends which are as follows:

  • S1: 12/sec/unit (but you get at least 100/sec in total; not per unit obviously); 10 units give you 120/sec and not 100+120/sec
  • S2: 120/sec/unit
  • S3: 6000/sec/unit

Now suppose you think about deploying 300 devices which send data every half a second. What tier should you use and how many units? It is clear that you need to send 600 messages per second so 5 units of S2 will suffice. You could also take 50 units of S1 for the same performance and price. With 5 units of S2 though, you can send more messages.

Now it would be nice to test the above in advance. At ThingTank we use Docker containers for this and we schedule them with Rancher, a great and easy to use Docker orchestration tool. If you want to try it, just use the container you can find on Docker Hub or the new Docker Store (still in beta). Just search for gbaeke and you will find the following container:

image

If you want to check out the code (warning: written hastily!), you can find it on GitHub here: https://github.com/xyloscloudservices/docker-itproceed. It is a simple NodeJs script that uses the Azure IoT Hub libraries to create a new device in the registry with a GUID for the name. Afterwards, the code sends a simple JSON payload to IoT Hub every half a second.

To use the script, start it as follows with three parameters:

app.js IoT_Hub_Short_Name IoT_Hub_Connection_String millis

Note: the millis parameter is the amount of milliseconds to wait between each send

Now you can run the containers in Rancher (for instance). I won’t go into the details how to add Docker Hosts to Rancher and how to create a new Stack (as they call it). Alternatively, you can run the containers on Azure Container Service or similar solutions.

In the PowerBI chart below, you see the eventcount every five seconds which is around 420-440 events which is a bit lower than expected for one S1 unit:

image

Note: the spike you see happens after the launch of 300 containers; throttling quickly kicks in

When switched to 5 S2 units, the graph looks as follows:

image

You see the eventcount jump to 3000 (near the end) which is what you would expect (300 containers send 600 messages per second = 3000 messages per 5 seconds which is possible with 5 S2 units that deliver 120 messages/sec/unit)

You really need to think if you want to send data every half a second or second. For our ThingTank Air Quality solution, we take measurements every second but aggregate them over a minute at the edge. Sending every minute with 5 S2 units would amount to thousands of devices before you reach the limits of IoT Hub!

%d bloggers like this: