GitOps with Weaveworks Flux – Installing and Updating Applications

In a previous post, we installed Weaveworks Flux. Flux synchronizes the contents of a git repository with your Kubernetes cluster. Flux can easily be installed via a Helm chart. As an example, we installed Traefik by adding the following yaml to the synced repository:

apiVersion: helm.fluxcd.io/v1
kind: HelmRelease
metadata:
  name: traefik
  namespace: default
  annotations:
    fluxcd.io/ignore: "false"
spec:
  releaseName: traefik
  chart:
    repository: https://kubernetes-charts.storage.googleapis.com/
    name: traefik
    version: 1.78.0
  values:
    serviceType: LoadBalancer
    rbac:
      enabled: true
    dashboard:
      enabled: true   

It does not matter where you put this file because Flux scans the complete repository. I added the file to a folder called traefik.

If you look more closely at the YAML file, you’ll notice its kind is HelmRelease. You need an operator that can handle this type of file, which is this one. In the previous post, we installed the custom resource definition and the operator manually.

Adding a custom application

Now it’s time to add our own application. You do not need to use Helm packages or the Helm operator to install applications. Regular yaml will do just fine.

The application we will deploy needs a Redis backend. Let’s deploy that first. Add the following yaml file to your repository:

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: redis
  labels:
    app: redis       
spec:
  selector:
    matchLabels:     
      app: redis
  replicas: 1        
  template:          
    metadata:
      labels:        
        app: redis
    spec:            
      containers:
      - name: redis
        image: redis
        resources:
          requests:
            cpu: 200m
            memory: 100Mi
        ports:
        - containerPort: 6379
---        
apiVersion: v1
kind: Service        
metadata:
  name: redis
  labels:            
    app: redis
spec:
  ports:
  - port: 6379       
    targetPort: 6379
  selector:          
    app: redis

After committing this file, wait a moment or run fluxctl sync. When you run kubectl get pods for the default namespace, you should see the Redis pod:

Redis is running — yay!!!

Now it’s time to add the application. I will use an image, based on the following code: https://github.com/gbaeke/realtime-go (httponly branch because master contains code to automatically request a certificate with Let’s Encrypt). I pushed the image to Docker Hub as gbaeke/fluxapp:1.0.0. Now let’s deploy the app with the following yaml:

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: realtime
  labels:
    app: realtime       
spec:
  selector:
    matchLabels:     
      app: realtime
  replicas: 1        
  template:          
    metadata:
      labels:        
        app: realtime
    spec:            
      containers:
      - name: realtime
        image: gbaeke/fluxapp:1.0.0
        env:
        - name: REDISHOST
          value: "redis:6379"
        resources:
          requests:
            cpu: 50m
            memory: 50Mi
          limits:
            cpu: 150m
            memory: 150Mi
        ports:
        - containerPort: 8080
---        
apiVersion: v1
kind: Service        
metadata:
  name: realtime
  labels:            
    app: realtime
spec:
  ports:
  - port: 80       
    targetPort: 8080
  selector:          
    app: realtime
---
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
  name: realtime-ingress
spec:
  rules:
  - host: realtime.IP.xip.io
    http:
      paths:
      - path: /
        backend:
          serviceName: realtime
          servicePort: 80

In the above yaml, replace IP in the Ingress specification to the IP of the external load balancer used by your Ingress Controller. Once you add the yaml to the git repository and you run fluxctl sync the application should be deployed. You see the following page when you browse to http://realtime.IP.xip.io:

Web app deployed via Flux and standard yaml

Great, v1.0.0 of the app is deployed using the gbaeke/fluxapp:1.0.0 image. But what if I have a new version of the image and the yaml specification does not change? Read on…

Upgrading the application

If you have been following along, you can now run the following command:

fluxctl list-workloads -a

This will list all workloads on the cluster, including the ones that were not installed by Flux. If you check the list, none of the workloads are automated. When a workload is automated, it can automatically upgrade the application when a new image appears. Let’s try to automate the fluxapp. To do so, you can either add annotations to your yaml or use fluxctl. Let’s use the yaml approach by adding the following to our deployment:

annotations:
    flux.weave.works/automated: "true"
    flux.weave.works/tag.realtime: semver:~1.0

Note: Flux only works with immutable tags; do not use latest

After committing the file and running fluxctl sync, you can run fluxctl list-workloads -a again. The deployment should now be automated:

fluxapp is now automated

Now let’s see what happens when we add a new version of the image with tag 1.0.1. That image uses a different header color to show the difference. Flux monitors the repository for changes. When it detects a new version of the image that matches the semver filter, it will modify the deployment. Let’s check with fluxctl list-workloads -a:

new image deployed

And here’s the new color:

New color in version 1.0.1. Exciting! 😊

But wait… what about the git repo?

With the configuration of a deploy key, Flux has access to the git repository. When a deployment is automated and the image is changed, that change is also reflected in the git repo:

Weave Flux updated the realtime yaml file

In the yaml, version 1.0.1 is now used:

Flux updated the yaml file

What if I don’t like this release? With fluxctl, you can rollback to a previous version like so:

Rolling back a release – will also update the git repo

Although this works, the deployment will be updated to 1.0.1 again since it is automated. To avoid that, first lock the deployment (or workload) and then force the release of the old image:

fluxctl lock -w=deployment/realtime

fluxctl release -n default --workload=deployment/realtime --update-image=gbaeke/fluxapp:1.0.0 --force

In your yaml, there will be an additional annotation: fluxcd.io/locked: ‘true’ and the image will be set to 1.0.0.

Conclusion

In this post, we looked at deploying and updating an application via Flux automation. You only need a couple of annotations to make this work. This was just a simple example. For an example with dev, staging and production branches and promotion from staging to production, be sure to look at https://github.com/fluxcd/helm-operator-get-started as well.

Using the OAuth Client Credentials Flow

I often get questions about protecting applications like APIs using OAuth. I guess you know the drill:

  • you have to obtain a token (typically a JWT or JSON Web Token)
  • the client submits the token to your backend (via a Authorization HTTP header)
  • the token needs to be verified (do you trust it?)
  • you need to grab some fields from the token to use in your application (claims).

When the client is a daemon or some server side process, you can use the client credentials grant flow to obtain the token from Azure AD. The flow works as follows:

OAuth Client Credentials Flow (image from Microsoft docs)

The client contacts the Azure AD token endpoint to obtain a token. The client request contains a client ID and client secret to properly authenticate to Azure AD as a known application. The token endpoint returns the token. In this post, I only focus on the access token which is used to access the resource web API. The client uses the access token in the Authorization header of requests to the API.

Let’s see how this works. Oh, and by the way, this flow should be done with Azure AD. Azure AD B2C does not support this type of flow (yet).

Create a client application in Azure AD

In Azure AD, create a new App Registration. This can be a standard app registration for Web APIs. You do not need a redirect URL or configure public clients or implicit grants.

Standard run of the mill app registration

In Certificates & secrets, create a client secret and write it down. It will not be shown anymore when you later come back to this page:

Yes, I set it to Never Expire!

From the Overview page, note the application ID (also client ID). You will need that later to request a token.

Why do we even create this application? It represents the client application that will call your APIs. With this application, you control the secret that the client application uses but also the access rights to the APIs as we will see later. The client application will request a token, specifying the client ID and the client secret. Let’s now create another application that represents the backend API.

Create an API application in Azure AD

This is another App Registration, just like the app registration for the client. In this case, it represents the API. Its settings are a bit different though. There is no need to specify redirect URIs or other settings in the Authentication setting. There is also no need for a client secret. We do want to use the Expose an API page though:

Expose API page

Make sure you get the application ID URI. In the example above, it is api://06b2a484-141c-42d3-9d73-32bec5910b06 but you can change that to something more descriptive.

When you use the client credentials grant, you do not use user scopes. As such, the Scopes defined by this API list is empty. Instead, you want to use application roles which are defined in the manifest:

Application role in the manifest

There is one role here called invokeRole. You need to generate a GUID manually and use that as the id. Make sure allowedMemberTypes contains Application.

Great! But now we need to grant the client the right to obtain a token for one or more of the roles. You do that in the client application, in API Permissions:

Client application is granted access to the invokeRole application role of the API application

To grant the permission, just click Add a permission, select My APIs, click your API and select the role:

Selecting the role

Delegated permissions is greyed out because there are no user scopes. Application permissions is active because we defined an application role on the API application.

Obtaining a token

The server-side application only needs to do one call to the token endpoint to obtain the access token. Here is an example call with curl:

curl -d "grant_type=client_credentials&client_id=f1f695cb-2d00-4c0f-84a5-437282f3f3fd&client_secret=SECRET&audience=api%3A%2F%2F06b2a484-141c-42d3-9d73-32bec5910b06&scope=api%3A%2F%2F06b2a484-141c-42d3-9d73-32bec5910b06%2F.default" -X POST "https://login.microsoftonline.com/019486dd-8ffb-45a9-9232-4132babb1324/oauth2/v2.0/token"

Ouch, lots of gibberish here. Let’s break it down:

  • the POST needs to send URL encoded data in the body; curl’s -d takes care of that but you need to perform the URL encoding yourself
  • grant_type: client_credentials to indicate you want to use this flow
  • client_id: the application ID of the client app registration in Azure AD
  • client_secret: URL encoded secret that you generated when you created the client app registration
  • audience: the resource you want an access token for; it is the URL encoding of api://06b2a484-141c-42d3-9d73-32bec5910b06 as set in Expose an API
  • scope: this one is a bit special; for the v2 endpoint that we use here it needs to be api://06b2a484-141c-42d3-9d73-32bec5910b06/.default (but URL encoded); the scope (or roles) that the client application has access to will be included in the token

The POST goes to the Azure AD v2.0 token endpoint. There is also a v1 endpoint which would require other fields. See the Microsoft docs for more info. Note that I also updated the application manifests to issue v2 tokens via the accessTokenAcceptedVersion field (set to 2).

The result of the call only results in an access token (no refresh token in the client credentials flow). Something like below with the token shortened:

{"token_type":"Bearer","expires_in":3600,"ext_expires_in":3600,"access_token":"eyJ0e..."}

The access_token can be decoded on https://jwt.ms:

Decoded token

Note that the invokeRole is present because the client application was granted access to that role. We also know the application ID that represents the API, which is in the aud field. The azp field contains the application ID of the client application.

Great, we can now use this token to call our API. The raw HTTP request would be in this form.

GET https://somehost/calc/v1/add/1/1 HTTP/1.1 
Host: somehost 
Authorization: Bearer eyJ0e...

Of course, your application needs to verify the token somehow. This can be done in your application or in an intermediate layer such as API Management. We will take a look at how to do this with API Management in a later post.

Conclusion

Authentication, authorization and, on a broader scale, identity can be very challenging. Technically though, a flow such as the client credentials flow, is fairly simple to implement once you have done it a few times. Hopefully, if you are/were struggling with this type of flow, this post has given you some pointers!

Exposing a local endpoint to the Internet with inlets

A while ago, I learned about inlets by Alex Ellis. It allows you to expose an endpoint on your internal network via a tunnel to an exit node. To actually reach your internal website, you navigate to the public IP and port of the exit node. Something like this:

Internet user --> public IP:port of exit node -- tunnel --> your local endpoint

On both the exit node and your local network, you need to run inlets. Let’s look at an example. Suppose I want to expose my Magnificent Image Classifier πŸ˜€ running on my local machine to the outside world. The classifier is actually just a container you can run as follows:

docker run -p 9090:9090 -d gbaeke/nasnet

The container image is big so it will take while to start. When the container is started, just navigate to http://localhost:9090 to see the UI. You can upload a picture to classify it.

So far so good. Now you need an exit node with a public IP. I deployed a small Azure B-series Linux VM (B1s; 7 euros/month). SSH into that VM and install the inlets CLI (yeah, I know piping a script to sudo sh is dangerous 😏):

curl -sLS https://get.inlets.dev | sudo sh

Now run the inlets server (from instructions here):

export token=$(head -c 16 /dev/urandom | shasum | cut -d" " -f1) 
inlets server --port=9090 --token="$token"

The first line just generates a random token. You can use any token you want or even omit a token (not recommended). The second command runs the server on port 9090. It’s the same port as my local endpoint but that is not required. You can use any valid port.

TIP: the Azure VM had a network security group (NSG) configured so I had to add TCP port 9090 to the allow list

Now that the server is running, let’s run the client. Install inlets like above or use brew install inlets on a Mac and run the following commands:

export REMOTE="IP OF EXIT NODE:9090"
export TOKEN="TOKEN FROM SERVER"  
inlets client \
   --remote=$REMOTE \  
   --upstream=http://127.0.0.1:9090  
   --token $TOKEN

The inlets client will establish a web sockets connection to the inlets server on the exit node. The –upstream option is used to specify the local endpoint. In my case, that’s the classifier container (nasnet-go).

I can now browse to the public IP and port of the inlets server to see the classifier UI:

The inlets server will show the logs:

I think inlets is a fantastic tool that is useful in many scenarios. I have used ngrok in the past but it has some limits. You can pay to remove those limits. Inlets, on the other hand, is fully open source and not limited in any way. Be sure to check out the inlets GitHub page which has lots more details. Highly recommended!!!

Azure DevOps multi-stage YAML pipelines

A while ago, the Azure DevOps blog posted an update about multi-stage YAML pipelines. The concept is straightforward: define both your build (CI) and release (CD) pipelines in a YAML file and stick that file in your source code repository.

In this post, we will look at a simple build and release pipeline that builds a container, pushes it to ACR, deploys it to Kubernetes linked to an environment. Something like this:

Two stages in the pipeline – build and deploy (as simple as it can get, almost)

Note: I used a simple go app, a Dockerfile and a Kubernetes manifest as source files, check them out here.

Note: there is also a video version πŸ˜‰

Note: if you start from a repository without manifests and azure-pipelines.yaml, the pipeline build wizard will propose Deploy to Azure Kubernetes Service. The wizard that follows will ask you some questions but in the end you will end up with a configured environment, the necessary service connections to AKS and ACR and even a service.yaml and deployment.yaml with the bare minimum to deploy your container!

“Show me the YAML!!!”

The file, azure-pipelines.yaml contains the two stages. Check out the first stage (plus trigger and variables) below:

trigger:
- master

variables:
  imageName: 'gosample'
  registry: 'REGNAME.azurecr.io'

stages:
- stage: build
  jobs:
  - job: 'BuildAndPush'
    pool:
      vmImage: 'ubuntu-latest'
    steps:
    - task: Docker@2
      inputs:
        containerRegistry: 'ACR'
        repository: '$(imageName)'
        command: 'buildAndPush'
        Dockerfile: '**/Dockerfile'
    - task: PublishPipelineArtifact@0
      inputs:
        artifactName: 'manifests'
        targetPath: 'manifests' 

The pipeline runs on a commit to the master branch. The variables imageName and registry are referenced later using $(imageName) and $(registry). Replace REGNAME with the name of your Azure Container Registry.

It’s a multi-stage pipeline, so we start with stages: and then define the first stage build. That stage has one job which consists of two steps:

  • Docker task (v2): build a Docker image based on the Dockerfile in the source code repository and push it to the container registry called ACR; ACR is a reference to a service connection defined in the project settings
  • PublishPipelineArtifact: the source code repository contains Kubernetes deployment manifests in YAML format in the manifests folder; the contents of that folder is published as a pipeline artifact, to be picked up in a later stage

Now let’s look at the deployment stage:

- stage: deploy
  jobs:
  - deployment: 'DeployToK8S'
    pool:
      vmImage: 'ubuntu-latest'
    environment: dev
    strategy:
      runOnce:
        deploy:
          steps:
            - task: DownloadPipelineArtifact@1
              inputs:
                buildType: 'current'
                artifactName: 'manifests'
                targetPath: '$(System.ArtifactsDirectory)/manifests'
            - task: KubernetesManifest@0
              inputs:
                action: 'deploy'
                kubernetesServiceConnection: 'dev-kub-gosample-1558821689026'
                namespace: 'gosample'
                manifests: '$(System.ArtifactsDirectory)/manifests/deploy.yaml'
                containers: '$(registry)/$(imageName):$(Build.BuildId)' 

The second stage uses a deployment job (quite new; see this). In a deployment job, you can specify an environment to link to. In the above job, the environment is called dev. In Azure DevOps, the environment is shown as below:

dev environment

The environment functionality has Kubernetes integration which is pretty neat. You can drill down to the deployed objects such as deployments and services:

Kubernetes deployment in an Azure DevOps environment

The deployment has two tasks:

  • DownloadPipelineArtifact: download the artifact published in the first stage to $(System.ArtifactsDirectory)/manifests
  • KubernetesManifest: this task can deploy Kubernetes manifests; it uses an AKS service connection that was created during creation of the environment; a service account was created in a specific namespace and with access rights to that namespace only; the manifests property will look for an image name in the Kubernetes YAML files and append the tag which is the build id here

Note that the release stage will actually download the pipeline artifact automatically. The explicit DownloadPipelineArtifact task gives additional control over the download location.

The KubernetesManifest task is relatively new at the time of this writing (end of May 2019). Its image substitution functionality could be enough in many cases, without having to revert to Helm or manual text substitution tasks. There is more to this task than what I have described here. Check out the docs for more info.

Conclusion

If you are just starting out building CI/CD pipelines in YAML, you will probably have a hard time getting uses to the schema. I know I had! 😑 In the end though, doing it this way with the pipeline stored in source control will pay off in the long run. After some time, you will have built up a useful library of these pipelines to quickly get up and running in new projects. Recommended!!! πŸ˜‰πŸš€πŸš€πŸš€

Securing access to and from Azure Functions

I am often asked how to secure access to and from Azure Functions that are not running in an App Service Environment (ASE). An App Service Environment allows you to safeguard your apps in a subnet of your Azure Virtual Network. In a sense, it gives you a private deployment of Azure App Service that you can secure with Azure Firewall, Network Security Groups (NSGs) or Network Virtual Appliances (NVAs).

When you use Azure Functions in a regular App Service Plan or Premium plan, you will need to rely on Virtual Network Service Endpoints and App Service network integration to achieve similar results.

In this post, we will look at an example of an Azure Function, running in a Premium plan, that queries CosmosDB. We will restrict incoming traffic to the Azure Function from a subnet and only allow CosmosDB to be queried by the same Azure Function. Here’s a diagram:

Incoming Traffic

To restrict incoming traffic to the Azure Function, navigate to the Function App in the portal and select Networking in Platform Features. You will see the following screen:

Azure Functions network features

We will configure the inbound restrictions via Configure Access Restrictions. You can configure restrictions for both the Function App itself and the scm site:

From the moment you add rules, a Deny All rule will appear. In the above rules, I allowed my private IP and the default subnet in the virtual network. The second rule configures the service endpoints. When you open the properties of the subnet, you will see:

Service Endpoint of type Microsoft.Web

Great! When you try to access the function from any other location, you will get a 403 error from the Azure Functions front-end. So don’t expect a connection timeout like with regular network security rules.

Outgoing traffic

The example Azure Function uses an HTTP trigger and a Cosmos DB input (cosmos). Documents contain a name property. The query outputs the name found on the first document:

module.exports = async function (context, req, cosmos) {
context.log(cosmos);
context.res = {
body: "hello " + cosmos[0].name
}};

In order to secure access to Cosmos DB, two features were used:

  1. Azure Functions VNet Integration (VNet integration is currently in preview)
  2. Cosmos DB network service endpoints to restrict access to the subnet that provides the Azure Function hosts with an IP address

Configuring the VNet integration is straightforward, especially when compared to the old style of integration which required a VPN tunnel:

App Service (including Azure Functions) VNet integration

As you can see in the above screenshot, you delegate a subnet to the App Service hosts. In my case, that is subnet func-sec:

Subnet delegated to a service (Microsoft.Web/serverFarms)

The bottom of the screenshot shows the subnet is delegated to the Microsoft.Web/serverFarms service. That is the result of the VNet integration.

You can also see the subnet has service endpoints configured for Cosmos DB. That is the result of the Cosmos DB configuration below:

Service endpoint config in Cosmos DB

In Cosmos DB, an existing virtual network was added. I did not enable the Accept connections from within public Azure datacenters option.

When you remove the service endpoint and you run the Azure Function, the following error is thrown:

Unable to proceed with the request. Please check the authorization claims to ensure the required permissions to process the request. ActivityId: 03b2c11f-2b21-44c9-ab44-61b4864539fe, Microsoft.Azure.Documents.Common/2.2.0.0, Windows/10.0.14393 documentdb-netcore-sdk/2.2.0

Does it work from a VM in the default subnet?

If all went well, I should be able to call the Azure Function from the virtual machine in the default subnet. Let’s try with curl:

Yes, itsme!

The name field in the first document is set to itsme so it worked! Great, the function can be called from the default subnet. In case you are wondering about the use of -p in the ssh command: this virtual machine sat behind an Azure Firewall and the VM ssh port was exposed via a DNAT rule over a random port.

From another location, the following error is shown (wrapped around some HTML but this is the main error):

Error 403 - This web app is stopped

Conclusion

With virtual network service endpoints now available for most Azure PaaS (platform as a service) components, you can ensure those services are only accessed from intended locations. In this example, you saw how to secure access to Azure Functions and Cosmos DB. Service endpoints combined with the App Service VNet integration make it straightforward to secure a Function App end-to-end.

Querying Postgres with GraphQL

I wanted a quick and easy way to build an API that retrieves the ten latest events from a stream of data sent to a TimescaleDB hypertable. Since such a table can be queried by any means supported by Postgres, I decided to use Postgraphile, which automatically provides a GraphQL server for a database.

If you have Node.js installed, just run the following command:

npm install -g postgraphile

Then run the following command to start the GraphQL server:

postgraphile -c "postgres://USER@SERVER:PASSWORD@SERVER.postgres.database.azure.com/DATABASE?ssl=1" --simple-collections only --enhance-graphiql

Indeed, I am using Azure Database for PostgreSQL. Replace the strings in UPPERCASE with your values. I used simple-collections only to, eh, only use simple collections which makes it, well, simpler. πŸ‘πŸ‘πŸ‘

Note: the maintainer of Postgraphile provided a link to what simple-collections actually does; take a look there for a more thorough explanation πŸ˜‰

The result of the above command looks like the screenshot below:

GraphQL Server started

You can now navigate to http://localhost:5000/graphiql to try some GraphQL queries in an interactive environment:

GraphiQL, enhanced with the –enhance-graphiql flag when we started the server

In the Explorer to the left, you can easily click the query together. In this case, that is easy to do since I only want to query a single table an obtain the last ten events for a single device. The resulting query looks like so:

{
allConditionsList(condition: {device: "pg-1"}, orderBy: TIME_DESC, first: 10) {
time
device
temperature
}
}

allConditionsList gets created by the GraphQL server by looking at the tables of the database. Indeed, my database contains a conditions table with time, device, temperature and humidity columns.

To finish off, let’s try to obtain the data with a regular POST method to http://localhost:5000/graphql. This is the command to use:

curl -X POST -H “Content-Type: application/json” -d ‘{“query”:”{\n allConditionsList(condition: {device: \”pg-1\”}, orderBy: TIME_DESC, first: 10) {\n time\n device\n temperature\n }\n}\n”,”variables”:null}’ http://localhost:5000/graphql

Ugly but it works. To be honest, there is some noise in the above command because of the \n escapes. They are the result of me grabbing the body from the network traffic sent by GraphiQL:

Yes, lazy me grabbing the request payload from GraphiQL and not cleaning it up πŸ˜‰

There is much, much, much more you can do with GraphQL in general and PostGraphile in particular but this was all I needed for now. Hopefully this can help you if you have to throw something together quickly. In a production setting, there is of course much more to think about: hosting the API (preferably in a container), authentication, authorization, performance, etc…

Improving an Azure Function that writes IoT Hub data to TimescaleDB

In an earlier post, I used an Azure Function to write data from IoT Hub to a TimescaleDB hypertable on PostgreSQL. Although that function works for demo purposes, there are several issues. Two of those issues will be addressed in this post:

  1. the INSERT INTO statement used the NOW() function instead of the enqueuedTimeUtc field; that field is provided by IoT Hub and represents the time the message was enqueued
  2. the INSERT INTO query does not use upsert functionality; if for some reason you need to process the IoT Hub data again, you will end up with duplicate data; you code should be idempotent

Using enqueuedTimeUtc

Using the time the event was enqueued means we need to retrieve that field from the message that our Azure Function receives. The Azure Function receives outside information via two parameters: context and eventHubMessage. The enqueuedTimeUtc field is retrieved via the context variable: context.bindingData.enqueuedTimeUtc.

In the INSERT INTO statement, we need to use TIMESTAMP ‘UCT time’. In JavaScript, that results in the following:

'insert into conditions(time, device, temperature, humidity) values(TIMESTAMP \'' + context.bindingData.enqueuedTimeUtc + '\',\'' + eventHubMessage.device + '\' ...

Using upsert functionality

Before adding upsert functionality, add a unique constraint to the hypertable like so (via pgAdmin):

CREATE UNIQUE INDEX on conditions (time, device); 

It needs to be on time and device because the time field on its own is not guaranteed to be unique. Now modify the INSERT INTO statement like so:

'insert into conditions(time, device, temperature, humidity) values(TIMESTAMP \'' + context.bindingData.enqueuedTimeUtc + '\',\'' + eventHubMessage.device + '\',' + eventHubMessage.temperature + ',' + eventHubMessage.humidity + ') ON CONFLICT DO NOTHING'; 

Notice the ON CONFLICT clause? When any constraint is violated, we do nothing. We do not add or modify data, we leave it all as it was.

The full Azure Function code is below:

Azure Function code with IoT Hub enqueuedTimeUtc and upsert

Conclusion

The above code is a little bit better already. We are not quite there yet but the two changes make sure that the date of the event is correct and independent from when the actual processing is done. By adding the constraint and upsert functionality, we make sure we do not end up with duplicate data when we reprocess data from IoT Hub.