Improving an Azure Function that writes IoT Hub data to TimescaleDB

In an earlier post, I used an Azure Function to write data from IoT Hub to a TimescaleDB hypertable on PostgreSQL. Although that function works for demo purposes, there are several issues. Two of those issues will be addressed in this post:

  1. the INSERT INTO statement used the NOW() function instead of the enqueuedTimeUtc field; that field is provided by IoT Hub and represents the time the message was enqueued
  2. the INSERT INTO query does not use upsert functionality; if for some reason you need to process the IoT Hub data again, you will end up with duplicate data; you code should be idempotent

Using enqueuedTimeUtc

Using the time the event was enqueued means we need to retrieve that field from the message that our Azure Function receives. The Azure Function receives outside information via two parameters: context and eventHubMessage. The enqueuedTimeUtc field is retrieved via the context variable: context.bindingData.enqueuedTimeUtc.

In the INSERT INTO statement, we need to use TIMESTAMP ‘UCT time’. In JavaScript, that results in the following:

'insert into conditions(time, device, temperature, humidity) values(TIMESTAMP \'' + context.bindingData.enqueuedTimeUtc + '\',\'' + eventHubMessage.device + '\' ...

Using upsert functionality

Before adding upsert functionality, add a unique constraint to the hypertable like so (via pgAdmin):

CREATE UNIQUE INDEX on conditions (time, device); 

It needs to be on time and device because the time field on its own is not guaranteed to be unique. Now modify the INSERT INTO statement like so:

'insert into conditions(time, device, temperature, humidity) values(TIMESTAMP \'' + context.bindingData.enqueuedTimeUtc + '\',\'' + eventHubMessage.device + '\',' + eventHubMessage.temperature + ',' + eventHubMessage.humidity + ') ON CONFLICT DO NOTHING'; 

Notice the ON CONFLICT clause? When any constraint is violated, we do nothing. We do not add or modify data, we leave it all as it was.

The full Azure Function code is below:

Azure Function code with IoT Hub enqueuedTimeUtc and upsert

Conclusion

The above code is a little bit better already. We are not quite there yet but the two changes make sure that the date of the event is correct and independent from when the actual processing is done. By adding the constraint and upsert functionality, we make sure we do not end up with duplicate data when we reprocess data from IoT Hub.

Dashboard your TimescaleDB data with Grafana

In an earlier post, I looked at storing time-series data with TimescaleDB on Azure Database for PostgreSQL. To visualize your data, there are many options as listed here. Because TimescaleDB is built on PostgreSQL, you can use any tool that supports PostgreSQL such as Power BI or Tableau.

Grafana is a bit of a special case because TimescaleDB engineers actually built the data source, which is designed to take advantage of the time-series capabilities. For a detailed overview of the capabilities of the data source, see the Grafana documentation.

Let’s take a look at a simple example to get started. I have a hypertable called conditions with four columns: time, device, temperature, humidity. An IoT Simulator is constantly writing data for five devices: pg-1 to pg-5.

On a multi-tier deployment of Grafana, I added the PostgreSQL data source:

PostgreSQL data source in Grafana

One setting in the data source is particularly noteworthy:

TimescaleDB support in the PostgreSQL datasource

Grafana has the concept of macro’s such as $_timeGroup or $_interval, as noted in the preceding image. The macro is translated to what the underlying data source supports. In this case, with TimescaleDB enabled, the macro results in the use of time_bucket, which is specific for TimescaleDB.

Creating a dashboard

Create a dashboard from the main page:

Creating a new dashboard

You will get a new dashboard with an empty panel:

Click Add Query. You will notice Grafana proposes a query. In this case it is very close because we only have one data source and table:

Grafana proposes the following query

Let’s modify this a bit. In the top right corner, I switched the time interval to last 30 minutes. Because the default query uses WHERE Macro: $_timeFilter, only the last 30 minutes will be shown. That’s another example of a macro. I would like to show the average temperature over 10 second intervals. That is easy to do with a GROUP BY and $_interval. In GROUP BY, click the + and type or select time to use the time field. You will notice the following:

GROUP BY with $_interval

Just click $_interval and select 10s. Now add the humidity column to the SELECT statement:

Adding humidity

When you click the Generated SQL link, you will see the query built by the query builder:

Generated SQL

Notice that the query uses time_bucket. The GROUP BY 1 and ORDER BY 1 just means group and order on the first field which is the time_bucket. If the query builder is not sufficient, you can click Edit SQL and specify your query directly. When you switch back to query builder, your custom SQL statement might be overwritten if the builder does not support it.

When you save your dashboard, you should see something like:

Pretty boring temperature and humidity graphWi

Now, let’s add a few gauges. In the top right row of icons, the first one should be Add panel. Choose the Gauge visualization and set your query:

Temperature Gauge

In Visualization, set Stat to Current:

Stat field on current

When the panel is finished, navigate back to the dashboard and duplicate the gauge. Modify the duplicated gauge to show humidity. Also change the titles. The dashboard now looks like:

Conditions dashboard

Grafana can be configured to auto refresh the dashboard. In the image below, refresh was set to every 5 seconds:

Setting auto refresh

Your dashboard will now update every 5 seconds for a more dynamic experience.

Joins

You can join hypertables with regular tables quite easily. This is one of the advantages of using a relational database such as PostgreSQL for your time-series data. The screenshot below shows a graph of the temperature per device location. The device location is stored in a regular table.

Join between hypertable and regular table: they are all just tables in the end

Here is the full dashboard:

Conclusion

Grafana, in combination with PostgreSQL and TimescaleDB, is a flexible solution for dashboarding your IoT time-series data. We have only scratched the surface here but it’s clear you can be up and running fast! Give it a go and tell me what you think in the comments or via @geertbaeke!

Multi-Tier Bitnami Grafana Stack on Azure

After seeing some tweets about Bitnami’s multi-tier Grafana Stack, I decided to give it a go. On the page describing the Grafana stack, there are several deployment offerings:

Grafana deployment offerings (Image: from Bitnami website)

I decided to use the multi-tier deployment, which deploys multiple Grafana nodes and a shared Azure Database for MariaDB.

On Azure, the Grafana stack is deployed via an Azure Resource Manager (ARM) template. You can easily find it via the Azure Marketplace:

Grafana multi-tier in Azure Marketplace

From the above page, click Create to start deploying the template. You will get a series of straightforward questions such as the resource group, the Grafana admin password, MariaDB admin password, virtual machine size, etc…

It will take about half an hour to deploy the template. When finished, you will find the following resources in the resource group you chose or created during deployment:

Deployed Grafana resources

Let’s take a look at the deployed resources. The database back-end is Azure Database for MariaDB server. The deployment uses a General Purpose, 2 vCore, 50GB database. The monthly cost is around €130.

The Grafana VMs are Standard D1 v2 virtual machines (can be changed). These two machine cost around €100 per month. By default, these virtual machines have a public IP that allows SSH access on port 22. To logon, use the password or public key you configured during deployment.

To access the Grafana portal, Bitnami used an Azure Application Gateway. They used the Standard tier (not WAF) with the Medium SKU size and three nodes. The monthly cost for this setup is around €140.

The public IP address of the front-end can be found in the list of resources (e.g. in my case, mygrafanaagw-ip). The IP address will have an associated DNS name in the form of
mygrafanaRANDOMTEXT-agw-dns.westeurope.cloudapp.azure.com. Simply connect to that URL to access your Grafana instance:

Grafana instance (after logging on and showing a simple dashboard

Naturally, you will want to access Grafana over SSL. That is something you will need to do yourself. For more information see this link.

It goes without saying that the template only takes care of deployment. Once deployed, you are responsible for the infrastructure! Security, backup, patching etc… is your responsibility!

Note that the template does not allow you to easily select the virtual network to deploy to. By default, the template creates a virtual network with address space 10.0.0.0/16. If you got some ARM templating skills, you can download the template right after validation but before deployment and modify it:

Downloading the template for modification

Conclusion

Setting up a multi-tier Grafana stack with Bitnami is very easy. Note that the cost of this deployment is around €370 per month though. Instead of deploying and managing Grafana yourself, you can also take a look at hosted offerings such as Grafana Cloud or Aiven Grafana.

Creating and containerizing a TensorFlow Go application

In an earlier post, I discussed using a TensorFlow model from a Go application. With the TensorFlow bindings for Go, you can load a model that was exported with TensorFlow’s SavedModelBuilder module. That module saves a “snapshot” of a trained model which can be used for inference.

In this post, we will actually use the model in a web application. The application presents the user with a page to upload an image:

The upload page

The class and its probability is displayed, including the processed image:

Clearly a hen!

The source code of the application can be found at https://github.com/gbaeke/nasnet-go. If you just want to try the application, use Docker and issue the following command (replace port 80 with another port if there is a conflict):

docker run -p 80:9090 -d gbaeke/nasnet

The image is around 2.55GB in size so be patient when you first run the application. When the container has started, open your browser at http://localhost to see the upload page.

To quickly try it, you can run the container on Azure Container Instances. If you use the Portal, specify port 9090 as the container port.

Nasnet container in ACI

A closer look at the appN

**UPDATE**: since first publication, the http handler code was moved into from main.go to handlers/handlers.go

In the init() function, the nasnet model is loaded with tf.LoadSavedModel. The ImageNet categories are also loaded with a call to getCategories() and stored in categories which is a map of int to a string array.

In main(), we simply print the TensorFlow version (1.12). Next, http.HandleFunc is used to setup a handler (upload func) when users connect to the root of the web app.

Naturally, most of the logic is in the upload function. In summary, it does the following:

  • when users just navigate to the page (HTTP GET verb), render the upload.gtpl template; that template contains the upload form and uses a bit of bootstrap to make it just a bit better looking (and that’s already an overstatement); to learn more about Go web templates, see this link.
  • when users submit a file (POST), the following happens:
    • read the image
    • convert the image to a tensor with the getTensor function; getTensor returns a *tf.Tensor; the tensor is created from a [1][224][224][3] array; note that each pixel value gets normalized by subtracting by 127.5 and then dividing by 127.5 which is the same preprocessing applied as in Keras (divide by 127.5 and subtract 1)
    • run a session by inputting the tensor and getting the categories and probabilities as output
    • look for the highest probability and save it, together with the category name in a variable of type ResultPageData (a struct)
    • the struct data is used as input for the response.gtpl template

Note that the image is also shown in the output. The processed image (resized to 224×224) gets converted to a base64-encoded string. That string can be used in HTML image rendering as follows (where {{.Picture}} in the template will be replaced by the encoded string):

 <img src="data:image/jpg;base64,{{.Picture}}"> 

Note that the application lacks sufficient error checking to gracefully handle the upload of non-image files. Maybe I’ll add that later! 😉

Containerization

To containerize the application, I used the Dockerfile from https://github.com/tinrab/go-tensorflow-image-recognition but removed the step that downloads the InceptionV3 model. My application contains a ready to use NasnetMobile model.

The container image is based on tensorflow/tensorflow:1.12.0. It is further modified as required with the TensorFlow C API and the installation of Go. As discussed earlier, I uploaded a working image on Docker Hub.

Conclusion

Once you know how to use TensorFlow models from Go applications, it is easy to embed them in any application, from command-line tools to APIs to web applications. Although this application does server-side processing, you can also use a model directly in the browser with TensorFlow.js or ONNX.js. For ONNX, try https://microsoft.github.io/onnxjs-demo/#/resnet50 to perform image classification with ResNet50 in the browser. You will notice that it will take a while to get started due to the model being downloaded. Once the model is downloaded, you can start classifying images. Personally, I prefer the server-side approach but it all depends on the scenario.

Virtual Node support in Azure Kubernetes Service (AKS)

Although I am using Kubernetes a lot, I didn’t quite get to trying the virtual nodes support. Virtual nodes is basically the implementation on AKS of the virtual kubelet project. The virtual kubelet project allows Kubernetes nodes to be backed by other services that support containers such as AWS Fargate, IoT Edge, Hyper.sh or Microsoft’s ACI (Azure Container Instances). The idea is that you spin up containers using the familiar Kubernetes API but on services like Fargate and ACI that can instantly scale and only charge you for the seconds the containers are running.

As expected, the virtual nodes support that is built into AKS uses ACI as the backing service. To use it, you need to deploy Kubernetes with virtual nodes support. Use either the CLI or the Azure Portal:

  • CLI: uses the Azure CLI on your machine or from cloud shell
  • Portal: uses the Azure Portal

Note that virtual nodes for AKS are currently in preview. Virtual nodes require AKS to be configured with the advanced network option. You will need to provide a subnet for the virtual nodes that will be dedicated to ACI. The advanced networking option gives you additional control about IP ranges but also allows you to deploy a cluster in an existing virtual network. Note that advanced networking results in the use of the Azure Virtual Network container network interface. Each pod on a regular host gets its own IP address on the virtual network. You will see them in the network as connected devices:

Connected devices on the Kubernetes VNET (includes pods)

In contrast, the containers you will create in the steps below will not show up as connected devices since they are managed by ACI which works differently.

Ok, go ahead and deploy a Kubernetes cluster or just follow along. After deployment, kubectl get nodes will show a result similar to the screenshot below:

kubectl get nodes output with virtual node

With the virtual node online, we can deploy containers to it. Let’s deploy the ONNX ResNet50v2 container from an earlier post and scale it up. Create a YAML file like below and use kubectl apply -f path_to_yaml to create a deployment:

 apiVersion: apps/v1
kind: Deployment
metadata:
name: resnet
spec:
replicas: 1
selector:
matchLabels:
app: resnet
template:
metadata:
labels:
app: resnet
spec:
containers:
- name: onnxresnet50v2
image: gbaeke/onnxresnet50v2
ports:
- containerPort: 5001
resources:
requests:
cpu: 1
limits:
cpu: 1
nodeSelector:
kubernetes.io/role: agent
beta.kubernetes.io/os: linux
type: virtual-kubelet
tolerations:
- key: virtual-kubelet.io/provider
operator: Exists
- key: azure.com/aci
effect: NoSchedule

With the nodeSelector, you constrain a pod to run on particular nodes in your cluster. In this case, we want to deploy on a host of type virtual-kubelet. With the toleration, you specify that the container can be scheduled on the hosts that match the specified taints. There are two taints here, virtual-kubelet.io/provider and azure.com/aci which are applied to the virtual kubelet node.

After executing the above YAML, I get the following result after kubectl get pods -o wide:

The pod is pending on node virtual-node-aci-linux

After a while, the pod will be running, but it’s actually just a container on ACI.

Let’s expose the deployment with a public IP via an Azure load balancer:

kubectl expose deployment resnet --port=80 --target-port=5001 --type=LoadBalancer

The above command creates a service of type LoadBalancer that maps port 80 of the Azure load balancer to, eventually, port 5001 of the container. Just use kubectl get svc to see the external IP address. Configuring the load balancer usually takes around one minute.

Now let’s try to scale the deployment to 100 containers:

kubectl scale --replicas=100 deployments/resnet

Instantly, the containers will be provisioned on ACI via the virtual kubelet:

NAME                      READY     STATUS     RESTARTS   AGE
resnet-6d7954d5d7-26n6l 0/1 Waiting 0 30s
resnet-6d7954d5d7-2bjgp 0/1 Creating 0 30s
resnet-6d7954d5d7-2jsrs 0/1 Creating 0 30s
resnet-6d7954d5d7-2lvqm 0/1 Pending 0 27s
resnet-6d7954d5d7-2qxc9 0/1 Creating 0 30s
resnet-6d7954d5d7-2wnn6 0/1 Creating 0 28s
resnet-6d7954d5d7-44rw7 0/1 Creating 0 30s
.... repeat ....

When you run kubectl get endpoints you will see all the endpoints “behind” the resnet service:

NAME         ENDPOINTS                                                       
resnet 40.67.216.68:5001,40.67.219.10:5001,40.67.219.22:5001
+ 97 more…

In container monitoring:

Hey, one pod has an issue! Who cares right?

As you can see, it is really easy to get started with virtual nodes and to scale up a deployment. In a later post, I will take a look at auto scaling containers on a virtual node.

Using the Microsoft Face API to detect emotions in photos and video

In a previous post, I blogged about detecting emotions with the ONNX FER+ model. As an alternative, you can use cloud models hosted by major cloud providers such as Microsoft, Amazon and Google. Besides those, there are many other services to choose from.

To detect facial emotions with Azure, there is a Face API in two flavours:

  • Cloud: API calls are sent to a cloud-hosted endpoint in the selected deployment region
  • Container: API calls are sent to a container that you deploy anywhere, including the edge (e.g. IoT Edge device)

To use the container version, you need to request access via this link. In another blog post, I already used the Text Analytics container to detect sentiment in a piece of text.

Note that the container version is not free and needs to be configured with an API key. The API key is obtained by deploying the Face API in the cloud. Doing so generates a primary and secondary key. Be aware that the Face API container, like the Text Analytics container, needs connectivity to the cloud to ensure proper billing. It cannot be used in completely offline scenarios. In short, no matter the flavour you use, you need to deploy the Face API. It will appear in the portal as shown below:

Deployed Face API (part of Cognitive Services)

Using the API is a simple matter. An image can be delivered to the API in two ways:

  • Link: just provide a URL to an image
  • Octet-stream: POST binary data (the image’s bytes) to the API

In the Go example you can find on GitHub, the second approach is used. You simply open the image file (e.g. a jpg or png) and pass the byte array to the endpoint. The endpoint is in the following form for emotion detection:

https://westeurope.api.cognitive.microsoft.com/face/v1.0/detect?returnFaceAttributes=emotion

Instead of emotion, you can ask for other attributes or a combination of attributes: age, gender, headPose, smile, facialHair, glasses, emotion, hair, makeup, occlusion, accessories, blur, exposure and noise. You simply add them together with +’s (e.g. emotion+age+gender). When you add attributes, the cost per call will increase slightly as will the response time. With the additional attributes, the Face API is much more useful than the simple FER+ model. The Face API has several additional features such as storing and comparing faces. Check out the documentation for full details.

To detect emotion in a video, the sample at https://github.com/gbaeke/emotion/blob/master/main.go contains some commented out code in the import section and around line 100 so you can use the Face API via the github.com/gbaeke/emotion/faceapi/msface package’s GetEmotion() function instead of the GetEmotion() function in the code. Because we have the full webcam image and face in an OpenCV mat, some extra code is needed to serialize it to a byte stream in a format the Face API understands:

encodedImage, _ := gocv.IMEncode(gocv.JPEGFileExt, face)       
emotion, err = msface.GetEmotion(bytes.NewReader(encodedImage))

In the above example, the face region detected by OpenCV is encoded to a JPG format as a byte slice. The byte slice is simply converted to an io.Reader and handed to the GetEmotion() function in the msface package.

When you use the Face API to detect emotions in a video stream from a webcam (or a video file), you will be hitting the API quite hard. You will surely need the standard tier of the API which allows you to do 10 transactions per second. To add face and emotion detection to video, the solution discussed in Detecting Emotions in FER+ is a better option.

Detecting emotions with FER+

In an earlier post, I discussed classifying images with the ResNet50v2 model. Azure Machine Learning Service was used to create a container image that used the ONNX ResNet50v2 model and the ONNX Runtime for scoring.

Continuing on that theme, I created a container image that uses the ONNX FER+ model that can detect emotions in an image. The container image also uses the ONNX Runtime for scoring.

You might wonder why you would want to detect emotions this way when there are many services available that can do this for you with a simple API call! You could use Microsoft’s Face API or Amazon’s Rekognition for example. While those services are easy to use and provide additional features, they do come at a cost. If all you need is basic detection of emotions, using this FER+ container is sufficient and cost effective.

Azure Face API (image from Microsoft website)

A notebook to create the image and deploy a container to Azure Container Instances (ACI) can be found here. The notebook uses the Azure Machine Learning SDK to register the model to an Azure Machine Learning workspace, build a container image from that model and deploy the container to ACI. The scoring script score.py is shown below.

score.py

The model expects an 64×64 gray scale image of a face in an array with the following dimensions: [1][1][64][64]. The output is JSON with a results array that contains the probabilities for each emotion and a time field with the inference time.

The emotion probabilities are in this order:

0: "neutral", 1: "happy", 2: "surprise", 3: "sadness", 4: "anger", 5: "disgust", 6: "fear", 7: "contempt

To actually capture the emotions, I wrote a small demo program in Go that uses OpenCV (via GoCV). You can find it on GitHub: https://github.com/gbaeke/emotion. You will need to install OpenCV and GoCV. Find the instructions here: https://gocv.io/getting-started/linux/. There are similar instructions for Mac and Windows but I have not tried those

The program is still a little rough around the edges but it does the trick. The scoring URI is hard coded to http://localhost:5002/score. With Docker installed, use the following command to install the scoring container:

 docker run -d -p 5002:5001 gbaeke/onnxferplus

Have fun with it!