Learn to use the Dapr authorization middleware

Based on a customer conversation, I decided to look into the Dapr middleware components. More specifically, I wanted to understand how the OAuth 2.0 middleware works that enables the Authorization Code flow.

In the Authorization Code flow, an authorization code is a temporary code that a client obtains after being redirected to an authorization URL (https://login.microsoftonline.com/{tenant}/oauth2/authorize) where you provide your credentials interactively (not useful for service-service non-interactive scenarios). That code is then handed to your app which exchanges it for an access token. With the access token, the authenticated user can access your app.

Instead of coding this OAuth flow in your app, we will let the Dapr middleware handle all of that work. Our app can then pickup the token from an HTTP header. When there is a token, access to the app is granted. Otherwise, Dapr (well, the Dapr sidecar next to your app) redirects your client to the authorization server to get a code.

Let’s take a look how this all works with Azure Active Directory. Other authorization servers are supported as well: Facebook, GitHub, Google, and more.

What we will build

Some experience with Kubernetes, deployments, ingresses, Ingress Controllers and Dapr is required.

If you think the explanation below can be improved, or I have made errors, do let me know. Let’s go…

Create an app registration

Using Azure AD means we need an app registration! Other platforms have similar requirements.

First, create an app registration following this quick start. In the first step, give the app a name and, for this demo, just select Accounts in this organizational directory only. The redirect URI will be configured later so just click Register.

After following the quick start, you should have:

  • the client ID and client secret: will be used in the Dapr component
  • the Azure AD tenant ID: used in the auth and token URLs in the Dapr component; Dapr needs to know where to redirect to and where to exchange the authorization code for an access token
App registration in my Azure AD Tenant

There is no need for your app to know about these values. All work is done by Dapr and Dapr only!

We will come back to the app registration later to create a redirect URI.

Install an Ingress Controller

We will use an Ingress Controller to provide access to our app’s Dapr sidecar from the Internet, using HTTP.

In this example, we will install ingress-nginx. Use the following commands (requires Helm):

helm upgrade --install ingress-nginx ingress-nginx \
  --repo https://kubernetes.github.io/ingress-nginx \
  --namespace ingress-nginx --create-namespace

Although you will find articles about daprizing your Ingress Controller, we will not do that here. We will use the Ingress Controller simply as a way to provide HTTP access to the Dapr sidecar of our app. We do not want Dapr-to-Dapr gRPC traffic between the Ingress Controller and our app.

When ingress-nginx is installed, grab the public IP address of the service that it uses. Use kubectl get svc -n ingress-nginx. I will use the IP address with nip.io to construct a host name like app.11.12.13.14.nip.io. The nip.io service resolves such a host name to the IP address in the name automatically.

The host name will be used in the ingress and the Dapr component. In addition, use the host name to set the redirect URI of the app registration: https://app.11.12.13.14.nip.io. For example:

Added a platform configuration for a web app and set the redirect URI

Note that we are using https here. We will configure TLS on the ingress later.

Install Dapr

Install the Dapr CLI on your machine and run dapr init -k. This requires a working Kubernetes context to install Dapr to your cluster. I am using a single-node AKS cluster in Azure.

Create the Dapr component and configuration

Below is the Dapr middleware component we need. The component is called myauth. Give it any name you want. The name will later be used in a Dapr configuration that is, in turn, used by the app.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: myauth
spec:
  type: middleware.http.oauth2
  version: v1
  metadata:
  - name: clientId
    value: "CLIENTID of your app reg"
  - name: clientSecret
    value: "CLIENTSECRET that you created on the app reg"
  - name: authURL
    value: "https://login.microsoftonline.com/TENANTID/oauth2/authorize"
  - name: tokenURL
    value: "https://login.microsoftonline.com/TENANTID/oauth2/token"
  - name: redirectURL
    value: "https://app.YOUR-IP.nip.io"
  - name: authHeaderName
    value: "authorization"
  - name: forceHTTPS
    value: "true"
scopes:
- super-api

Replace YOUR-IP with the public IP address of the Ingress Controller. Also replace the TENANTID.

With the information above, Dapr can exchange the authorization code for an access token. Note that the client secret is hard coded in the manifest. It is recommended to use a Kubernetes secret instead.

The component on its own is not enough. We need to create a Dapr configuration that references it:

piVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
  name: auth
spec:
  tracing:
    samplingRate: "1"
  httpPipeline:
    handlers:
    - name: myauth # reference the oauth component here
      type: middleware.http.oauth2    

Note that the configuration is called auth. Our app will need to use this configuration later, via an annotation on the Kubernetes pods.

Both manifests can be submitted to the cluster using kubectl apply -f. It is OK to use the default namespace for this demo. Keep the configuration and component in the same namespace as your app.

Deploy the app

The app we will deploy is super-api, which has a /source endpoint to dump all HTTP headers. When authentication is successful, the authorization header will be in the list.

Here is deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: super-api-deployment
  labels:
    app: super-api
spec:
  replicas: 1
  selector:
    matchLabels:
      app: super-api
  template:
    metadata:
      labels:
        app: super-api
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "super-api"
        dapr.io/app-port: "8080"
        dapr.io/config: "auth" # refer to Dapr config
        dapr.io/sidecar-listen-addresses: "0.0.0.0" # important
    spec:
      securityContext:
        runAsUser: 10000
        runAsNonRoot: true
      containers:
        - name: super-api
          image: ghcr.io/gbaeke/super:1.0.7
          securityContext:
            readOnlyRootFilesystem: true
            capabilities:
              drop:
                - all
          args: ["--port=8080"]
          ports:
            - name: http
              containerPort: 8080
              protocol: TCP
          env:
            - name: IPADDRESS
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
            - name: WELCOME
              value: "Hello from the Super API on AKS!!! IP is: $(IPADDRESS)"
            - name: LOG
              value: "true"       
          resources:
              requests:
                memory: "64Mi"
                cpu: "50m"
              limits:
                memory: "64Mi"
                cpu: "50m"
          livenessProbe:
            httpGet:
              path: /healthz
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 15
          readinessProbe:
              httpGet:
                path: /readyz
                port: 8080
              initialDelaySeconds: 5
              periodSeconds: 15

Note the annotations in the manifest above:

  • dapr.io/enabled: injects the Dapr sidecar in the pods
  • dapr.io/app-id: a Dapr app needs an id; a service will automatically be created with that id and -dapr appended; in our case the name will be super-api-dapr; our ingress will forward traffic to this service
  • dapr.io/app-port: Dapr will need to call endpoints in our app (after authentication in this case) so it needs the port that our app container uses
  • dapr.io/config: refers to the configuration we created above, which enables the http middleware defined by our OAuth component
  • dapr.io/sidecar-listen-addresses: ⚠️ needs to be set to “0.0.0.0”; without this setting, we will not be able to send requests to the Dapr sidecar directly from the Ingress Controller

Submit the app manifest with kubectl apply -f.

Check that the pod has two containers: the Dapr sidecar and your app container. Also check that there is a service called super-api-dapr. There is no need to create your own service. Our ingress will forward traffic to this service.

Create an ingress

In the same namespace as the app (default), create an ingress. This requires the ingress-nginx Ingress Controller we installed earlier:

apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: super-api-ingress
  namespace: default
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /
spec:
  ingressClassName: nginx
  tls:
    - hosts:
      - app.YOUR-IP.nip.io
      secretName: tls-secret 
  rules:
  - host: app.YOUR-IP.nip.io
    http:
      paths:
      - pathType: Prefix
        path: "/"
        backend:
          service:
            name: super-api-dapr
            port: 
              number: 80

Replace YOUR-IP with the public IP address of the Ingress Controller.

For this to work, you also need a secret with a certificate. Use the following commands:

openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout tls.key -out tls.crt -subj "/CN=app.YOUR-IP.nip.io"
kubectl create secret tls tls-secret --key tls.key --cert tls.crt

Replace YOUR-IP as above.

Testing the configuration

Let’s use the browser to connect to the /source endpoint. You will need to use the Dapr invoke API because the request will be sent to the Dapr sidecar. You need to speak a language that Dapr understands! The sidecar will just call http://localhost:8080/source and send back the response. It will only call the endpoint when authentication has succeeded, otherwise you will be redirected.

Use the following URL in the browser. It’s best to use an incognito session or private window.

https://app.20.103.17.249.nip.io/v1.0/invoke/super-api/method/source

Your browser will warn you of security risks because the certificate is not trusted. Proceed anyway! 😉

Note: we could use some URL rewriting on the ingress to avoid having to use /v1.0/invoke etc… You can also use different URL formats. See the docs.

You should get an authentication screen which indicates that the Dapr configuration is doing its thing:

Redirection to the authorize URL

After successful authentication, you should see the response from the /source endpoint of super-api:

Response from /source

The response contains an Authorization header. The header contains a JWT after the word Bearer. You can paste that JWT in https://jwt.io to see its content. We can only access the app with a valid token. That’s all we do in this case, ensuring only authenticated users can access our app.

Conclusion

In this article, we used Dapr to secure access to an app without having to modify the app itself. The source code of super-api was not changed in any way to enable this functionality. Via a component and a configuration, we instructed our app’s Dapr sidecar to do all this work for us. App endpoints such as /source are only called when there is a valid token. When there is such a token, it is saved in a header of your choice.

It is important to note that we have to send HTTP requests to our app’s sidecar for this to work. To enable this, we instructed the sidecar to listen on all IP addresses of the pod, not just 127.0.0.1. That allows us to send HTTP requests to the service that Dapr creates for the app. The ingress forwards requests to the Dapr service directly. That also means that you have to call your endpoint via the Dapr invoke API. I admit that can be confusing in the beginning. 😉

Note that, at the time of this writing (June 2022), the OAuth2 middleware in Dapr is in an alpha state.

Taking Azure Container Apps for a spin

At Ignite November 2021, Microsoft released Azure Container Apps as a public preview. It allows you to run containerized applications on a serverless platform, in the sense that you do not have to worry about the underlying infrastructure.

The underlying infrastructure is Kubernetes (AKS) as the control plane with additional software such as:

  • Dapr: distributed application runtime to easily work with state, pub/sub and other Dapr building blocks
  • KEDA: Kubernetes event-driven autoscaler so you can use any KEDA supported scaler, in addition to scaling based on HTTP traffic, CPU and memory
  • Envoy: used to provide ingress functionality and traffic splitting for blue-green deployment, A/B testing, etc…

Your apps actually run on Azure Container Instances (ACI). ACI was always meant to be used as raw compute to build platforms with and this is a great use case.

Note: there is some discussion in the community whether ACI (via AKS virtual nodes) is used or not; I will leave it in for now but in the end, it does not matter too much as the service is meant to hide this complexity anyway

Azure Container Apps does not care about the runtime or programming model you use. Just use whatever feels most comfortable and package it as a container image.

In this post, we will deploy an application that uses Dapr to save state to Cosmos DB. Along the way, we will explain most of the concepts you need to understand to use Azure Container Apps in your own scenarios. The code I am using is on GitHub and written in Go.

Configure the Azure CLI

In this post, we will use the Azure CLI exclusively to perform all the steps. Instead of the Azure CLI, you can also use ARM templates or Bicep. If you want to play with a sample that deploys multiple container apps and uses Bicep, be sure to check out this great Azure sample.

You will need to have the Azure CLI installed and also add the Container Apps extension:

az extension add \
  --source https://workerappscliextension.blob.core.windows.net/azure-cli-extension/containerapp-0.2.0-py2.py3-none-any.whl

The extension allows you to use commands like az containerapp create and az containerapp update.

Create an environment

An environment runs one or more container apps. A container app can run multiple containers and can have revisions. If you know how Kubernetes works, each revision of a container app is actually a scaled collection of Kubernetes pods, using the scalers discussed above. Each revision can be thought of as a separate Kubernetes Deployment/ReplicaSet that runs a specific version of your app. Whenever you modify your app, depending on the type of modification, you get a new revision. You can have multiple active revisions and set traffic weights to distribute traffic as you wish.

Container apps, revisions, pods, and containers

Note that above, although you see multiple containers in a pod in a revision, that is not the most common use case. Most of the time, a pod will have only one application container. That is entirely up to you and the rationale behind using one or more containers is similar to multi-container pods in Kubernetes.

To create an environment, be sure to register or re-register the Microsoft.Web provider. That provider has the kubeEnvironments resource type, which represents a Container App environment.

az provider register --namespace Microsoft.Web

Next, create a resource group:

az group create --name rg-dapr --location northeurope

I have chosen North Europe here, but the location of the resource group does not really matter. What does matter is that you create the environment in either North Europe or Canada Central at this point in time (November 2021).

Every environment needs to be associated with a Log Analytics workspace. You can use that workspace later to view the logs of your container apps. Let’s create such a workspace in the resource group we just created:

az monitor log-analytics workspace create \
  --resource-group rg-dapr \
  --workspace-name dapr-logs

Next, we want to retrieve the workspace client id and secret. We will need that when we create the Container Apps environment. Commands below expect the use of bash:

LOG_ANALYTICS_WORKSPACE_CLIENT_ID=`az monitor log-analytics workspace show --query customerId -g rg-dapr -n dapr-logs --out tsv`
LOG_ANALYTICS_WORKSPACE_CLIENT_SECRET=`az monitor log-analytics workspace get-shared-keys --query primarySharedKey -g rg-dapr -n dapr-logs --out tsv`

Now we can create the environment in North Europe:

az containerapp env create \
  --name dapr-ca \
  --resource-group rg-dapr \
  --logs-workspace-id $LOG_ANALYTICS_WORKSPACE_CLIENT_ID \
  --logs-workspace-key $LOG_ANALYTICS_WORKSPACE_CLIENT_SECRET \
  --location northeurope

The Container App environment shows up in the portal like so:

Container App Environment in the portal

There is not a lot you can do in the portal, besides listing the apps in the environment. Provisioning an environment is extremely quick, in my case a matter of seconds.

Deploying Cosmos DB

We will deploy a container app that uses Dapr to write key/value pairs to Cosmos DB. Let’s deploy Cosmos DB:

uniqueId=$RANDOM
az cosmosdb create \
  --name dapr-cosmosdb-$uniqueId \
  --resource-group rg-dapr \
  --locations regionName='northeurope'

az cosmosdb sql database create \
    -a dapr-cosmosdb-$uniqueId \
    -g rg-dapr \
    -n dapr-db

az cosmosdb sql container create \
    -a dapr-cosmosdb-$uniqueId \
    -g rg-dapr \
    -d dapr-db \
    -n statestore \
    -p '/partitionKey' \
    --throughput 400

The above commands create the following resources:

  • A Cosmos DB account in North Europe: note that this uses session-level consistency (remember that for later in this post 😉)
  • A Cosmos DB database that uses the SQL API
  • A Cosmos DB container in that database, called statestore (can be anything you want)

In Cosmos DB Data Explorer, you should see:

statestore collection will be used as a State Store in Dapr

Deploying the Container App

We can use the following command to deploy the container app and enable Dapr on it:

az containerapp create \
  --name daprstate \
  --resource-group rg-dapr \
  --environment dapr-ca \
  --image gbaeke/dapr-state:1.0.0 \
  --min-replicas 1 \
  --max-replicas 1 \
  --enable-dapr \
  --dapr-app-id daprstate \
  --dapr-components ./components-cosmosdb.yaml \
  --target-port 8080 \
  --ingress external

Let’s unpack what happens when you run the above command:

  • A container app daprstate is created in environment dapr-ca
  • The container app will have an initial revision (revision 1) that runs one container in its pod; the container uses image gbaeke/dapr-state:1.0.0
  • We turn off scaling by setting min and max replicas to 1
  • We enable ingress with the type set to external. That configures a public IP address and DNS name to reach our container app on the Internet; Envoy proxy is used under the hood to achieve this; TLS is automatically configured but we do need to tell the proxy the port our app listens on (–target-port 8080)
  • Dapr is enabled and requires that our app gets a Dapr id (–enable-dapr and –dapr-app-id daprstate)

Because this app uses the Dapr SDK to write key/value pairs to a state store, we need to configure this. That is were the –dapr-components parameter comes in. The component is actually defined in a file components-cosmosdb.yaml:

- name: statestore
  type: state.azure.cosmosdb
  version: v1
  metadata:
    - name: url
      value: YOURURL
    - name: masterkey
      value: YOURMASTERKEY
    - name: database
      value: YOURDB
    - name: collection
      value: YOURCOLLECTION

In the file, the name of our state store is statestore but you can choose any name. The type has to be state.azure.cosmosdb which requires the use of several metadata fields to specify the URL to your Cosmos DB account, the key to authenticate, the database, and collection.

In the Go code, the name of the state store is configurable via environment variables or arguments and, by total coincidence, defaults to statestore 😉.

func main() {
	fmt.Printf("Welcome to super api\n\n")

	// flags
	... code omitted for brevity
	// State store name
	f.String("statestore", "statestore", "State store name")

The flag is used in the code that writes to Cosmos DB with the Dapr SDK (s.config.Statestore in the call to daprClient.SaveState below):

// write data to Dapr statestore
	ctx := r.Context()
	if err := s.daprClient.SaveState(ctx, s.config.Statestore, state.Key, []byte(state.Data)); err != nil {
		w.WriteHeader(http.StatusInternalServerError)
		fmt.Fprintf(w, "Error writing to statestore: %v\n", err)
		return
	} else {
		w.WriteHeader(http.StatusOK)
		fmt.Fprintf(w, "Successfully wrote to statestore\n")
	}

After running the az containerapp create command, you should see the following output (redacted):

{
  "configuration": {
    "activeRevisionsMode": "Multiple",
    "ingress": {
      "allowInsecure": false,
      "external": true,
      "fqdn": "daprstate.politegrass-37c1a51f.northeurope.azurecontainerapps.io",
      "targetPort": 8080,
      "traffic": [
        {
          "latestRevision": true,
          "revisionName": null,
          "weight": 100
        }
      ],
      "transport": "Auto"
    },
    "registries": null,
    "secrets": null
  },
  "id": "/subscriptions/SUBID/resourceGroups/rg-dapr/providers/Microsoft.Web/containerApps/daprstate",
  "kind": null,
  "kubeEnvironmentId": "/subscriptions/SUBID/resourceGroups/rg-dapr/providers/Microsoft.Web/kubeEnvironments/dapr-ca",
  "latestRevisionFqdn": "daprstate--6sbsmip.politegrass-37c1a51f.northeurope.azurecontainerapps.io",
  "latestRevisionName": "daprstate--6sbsmip",
  "location": "North Europe",
  "name": "daprstate",
  "provisioningState": "Succeeded",
  "resourceGroup": "rg-dapr",
  "tags": null,
  "template": {
    "containers": [
      {
        "args": null,
        "command": null,
        "env": null,
        "image": "gbaeke/dapr-state:1.0.0",
        "name": "daprstate",
        "resources": {
          "cpu": 0.5,
          "memory": "1Gi"
        }
      }
    ],
    "dapr": {
      "appId": "daprstate",
      "appPort": null,
      "components": [
        {
          "metadata": [
            {
              "name": "url",
              "secretRef": "",
              "value": "https://ACCOUNTNAME.documents.azure.com:443/"
            },
            {
              "name": "masterkey",
              "secretRef": "",
              "value": "MASTERKEY"
            },
            {
              "name": "database",
              "secretRef": "",
              "value": "dapr-db"
            },
            {
              "name": "collection",
              "secretRef": "",
              "value": "statestore"
            }
          ],
          "name": "statestore",
          "type": "state.azure.cosmosdb",
          "version": "v1"
        }
      ],
      "enabled": true
    },
    "revisionSuffix": "",
    "scale": {
      "maxReplicas": 1,
      "minReplicas": 1,
      "rules": null
    }
  },
  "type": "Microsoft.Web/containerApps"
}

The output above gives you a hint on how to define the Container App in an ARM template. Note the template section. It defines the containers that are part of this app. We have only one container with default resource allocations. It is possible to set environment variables for your containers but there are none in this case. We will set one later.

Also note the dapr section. It defines the app’s Dapr id and the components it can use.

Note: it is not a good practice to enter secrets in configuration files as we did above. To fix that:

  • add a secret to the Container App in the az containerapp create command via the --secrets flag. E.g. --secrets cosmosdb='YOURCOSMOSDBKEY'
  • in components-cosmosdb.yaml, replace value: YOURMASTERKEY with secretRef: cosmosdb

The URL for the app is https://daprstate.politegrass-37c1a51f.northeurope.azurecontainerapps.io. When I browse to it, I just get a welcome message: Hello from Super API on Container Apps.

Every revision also gets a URL. The revision URL is https://daprstate–6sbsmip.politegrass-37c1a51f.northeurope.azurecontainerapps.io. Of course, this revision URL gives the same result. Our app has only one revision.

Save state

The application has a /state endpoint you can post a JSON payload to in the form of:

{
  "key": "keyname",
  "data": "datatostoreinkey"
}

We can use curl to try this:

curl -v -H "Content-type: application/json" -d '{ "key": "cool","data": "somedata"}' 'https://daprstate.politegrass-37c1a51f.northeurope.azurecontainerapps.io/state'

Trying the curl command will result in an error because Dapr wants to use strong consistency with Cosmos DB and we configured it for session-level consistency. That is not very relevant for now as that is related to Dapr and not Container Apps. Switching the Cosmos DB account to strong consistency will fix the error.

Update the container app

Let’s see what happens when we update the container app. We will add an environment variable WELCOME to change the welcome message that the app displays. Run the following command:

az containerapp update \
  --name daprstate \
  --resource-group rg-dapr \
  --environment-variables WELCOME='Hello from new revision'

The template section in the JSON output is now:

"template": {
    "containers": [
      {
        "args": null,
        "command": null,
        "env": [
          {
            "name": "WELCOME",
            "secretRef": null,
            "value": "Hello from new revision"
          }
        ],
        "image": "gbaeke/dapr-state:1.0.0",
        "name": "daprstate",
        "resources": {
          "cpu": 0.5,
          "memory": "1Gi"
        }
      }
    ]

It is important to realize that, when the template changes, a new revision will be created. We now have two revisions, reflected in the portal as below:

Container App with two revisions

The new revision is active and receives 100% of the traffic. When we hit the / endpoint, we get Hello from new revision.

The idea here is that you deploy a new revision and test it before you make it active. Another option is to send a small part of the traffic to the new revision and see how that goes. It’s not entirely clear to me how you can automate this, including automated tests, similar to how progressive delivery controllers like Argo Rollouts and Flagger work. Tip to the team to include this! 😉

The az container app create and update commands can take a lot of parameters. Use az container app update –help to check what is supported. You will also see several examples.

Check the logs

Let’s check the container app logs that are sent to the Log Analytics workspace attached to the Container App environment. Make sure you still have the log analytics id in $LOG_ANALYTICS_WORKSPACE_CLIENT_ID:

az monitor log-analytics query   --workspace $LOG_ANALYTICS_WORKSPACE_CLIENT_ID   --analytics-query "ContainerAppConsoleLogs_CL | where ContainerAppName_s == 'daprstate' | project ContainerAppName_s, Log_s, TimeGenerated | take 50"   --out table

This will display both logs from the application container and the Dapr logs. One of the log entries shows that the statestore was successfully initialized:

... msg="component loaded. name: statestore, type: state.azure.cosmosdb/v1"

Conclusion

We have only scratched the surface here but I hope this post gave you some insights into concepts such as environments, container apps, revisions, ingress, the use of Dapr and logging. There is much more to look at such as virtual network integration, setting up scale rules (e.g. KEDA), automated deployments, and much more… Stay tuned!

From MQTT to InfluxDB with Dapr

In a previous post, we looked at using the Dapr InfluxDB component to write data to InfluxDB Cloud. In this post, we will take a look at reading data from an MQTT topic and storing it in InfluxDB. We will use Dapr 0.10, which includes both components.

To get up to speed with Dapr, please read the previous post and make sure you have an InfluxDB instance up and running in the cloud.

If you want to see a video instead:

MQTT to Influx with Dapr

Note that the video sends output to both InfluxDB and Azure SignalR. In addition, the video uses Dapr 0.8 with a custom compiled Dapr because I was still developing and testing the InfluxDB component.

MQTT Server

Although there are cloud-based MQTT servers you can use, let’s mix it up a little and run the MQTT server from Docker. If you have Docker installed, type the following:

docker run -it -p 1883:1883 -p 9001:9001 eclipse-mosquitto

The above command runs Mosquitto and exposes port 1883 on your local machine. You can use a tool such as MQTT Explorer to send data. Install MQTT Explorer on your local machine and run it. Create a connection like in the below screenshot:

MQTT Explorer connection

Now, click Connect to connect to Mosquitto. With MQTT, you send data to topics of your choice. Publish a json message to a topic called test as shown below:

Publish json data to the test topic

You can now click the topic in the list of topics and see its most recent value:

Subscribing to the test topic

Using MQTT with Dapr

You are now ready to read data from an MQTT topic with Dapr. If you have Dapr installed, you can run the following code to read from the test topic and store the data in InfluxDB:

const express = require('express');
const bodyParser = require('body-parser');

const app = express();
app.use(bodyParser.json());

const port = 3000;

// mqtt component will post messages from influx topic here
app.post('/mqtt', (req, res) => {
    console.log("MQTT Binding Trigger");
    console.log(req.body)

    // body is expected to contain room and temperature
    room = req.body.room
    temperature = req.body.temperature

    // room should not contain spaces
    room = room.split(" ").join("_")

    // create message for influx component
    message = {
        "measurement": "stat",
        "tags": `room=${room}`,
        "values": `temperature=${temperature}`
    };
    
    // send the message to influx output binding
    res.send({
        "to": ["influx"],
        "data": message
    });
});

app.listen(port, () => console.log(`Node App listening on port ${port}!`));

In this example, we use Node.js instead of Python to illustrate that Dapr works with any language. You will also need this package.json and run npm install:

{
  "name": "mqttapp",
  "version": "1.0.0",
  "description": "",
  "main": "app.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "author": "",
  "license": "ISC",
  "dependencies": {
    "body-parser": "^1.18.3",
    "express": "^4.16.4"
  }
}

In the previous post about InfluxDB, we used an output binding. You use an output binding by posting data to a Dapr HTTP URI.

To use an input binding like MQTT, you will need to create an HTTP server. Above, we create an HTTP server with Express, and listen on port 3000 for incoming requests. Later, we will instruct Dapr to listen for messages on an MQTT topic and, when a message arrives, post it to our server. We can then retrieve the message from the request body.

To tell Dapr what to do, we’ll create a components folder in the same folder that holds the Node.js code. Put a file in that folder with the following contents:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt
spec:
  type: bindings.mqtt
  metadata:
  - name: url
    value: mqtt://localhost:1883
  - name: topic
    value: test

Above, we configure the MQTT component to list to topic test on mqtt://localhost:1883. The name we use (in metadata) is important because that needs to correspond to our HTTP handler (/mqtt).

Like in the previous post, there’s another file that configures the InfluxDB component:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: influx
spec:
  type: bindings.influx
  metadata:
  - name: Url
    value: http://localhost:9999
  - name: Token
    value: ""
  - name: Org
    value: ""
  - name: Bucket
    value: ""

Replace the parameters in the file above with your own.

Saving the MQTT request body to InfluxDB

If you look at the Node.js code, you have probably noticed that we send a response body in the /mqtt handler:

res.send({
        "to": ["influx"],
        "data": message
    });

Dapr is written to accept responses that include a to and a data field in the JSON response. The above response simply tells Dapr to send the message in the data field to the configured influx component.

Does it work?

Let’s run the code with Dapr to see if it works:

dapr run --app-id mqqtinflux --app-port 3000 --components-path=./components node app.js

In dapr run, we also need to specify the port our app uses. Remember that Dapr will post JSON data to our /mqtt handler!

Let’s post some JSON with the expected fields of temperature and room to our MQTT server:

Posting data to the test topic

The Dapr logs show the following:

Logs from the APP (appear alongside the Dapr logs)

In InfluxDB Cloud table view:

Data stored in InfluxDB Cloud (posted some other data points before)

Conclusion

Dapr makes it really easy to retrieve data with input bindings and send that data somewhere else with output bindings. There are many other input and output bindings so make sure you check them out on GitHub!

Dapr Service Invocation between an HTTP Python client and a GRPC Go server

Recently, I published several videos about Dapr on my Youtube channel. The videos cover the basics of state management, PubSub and service invocation.

The Getting Started with state management and service invocation:

Let’s take a closer look at service invocation with HTTP, Python and Node.

Service Invocation with HTTP

Service invocation (from Dapr docs)

The services you write (here Service A and B) talk to each other using the Dapr runtime. On Kubernetes, you talk to a Dapr sidecar deployed alongside your service container. On your development machine, you run your services via dapr run.

If you want to expose a method on Service B and you use HTTP, you just need to expose an HTTP handler or route. For example, with Express in Node you would use something like:

const express = require('express');
const app = express();
app.post('/neworder', (req, res) => { your code }

You then run your service and annotate it with the proper Dapr annotations (Kubernetes):

annotations:
        dapr.io/enabled: "true"
        dapr.io/id: "node"
        dapr.io/port: "3000"

On your local machine, you would just run the service via dapr run:

dapr run --app-id node --app-port 3000 node app.js

In the last example, the Dapr id is node and we indicate that the service is listening on port 3000. To invoke the method from service A, it can use the following code (Python example shown):

dapr_port = os.getenv("DAPR_HTTP_PORT", 3500)
dapr_url = "http://localhost:{}/v1.0/invoke/node/method/neworder".format(dapr_port)
message = {"data": {"orderId": 1234}}
response = requests.post(dapr_url, json=message, timeout=5)

As you can see, service A does not contact service B directly. It just talks to its Dapr sidecar on localhost (or Dapr on your dev machine) and asks it to invoke the neworder method via a service that uses Dapr id node. It is also clear that both service A and B use HTTP only. Because you just use HTTP to expose and invoke methods, you can use any language or framework.

You can find a complete example here with Node and Python.

Service Invocation with HTTP and GRPC

Dapr has SDKs available for C#, Go and other languages. You might prefer those over the generic HTTP approach. In the case of Go, the SDK uses GRPC to interface with the Dapr runtime. With Dapr in between, one service can use HTTP while another uses GRPC.

Let’s take a look at a service that exposes a method (HelloFromGo) from a Go application. The full example is here. Instead of creating an HTTP route with the name of your method, you use an OnInvoke handler that looks like this (only the start is shown, see the full code):

func (s *server) OnInvoke(ctx context.Context, in *commonv1pb.InvokeRequest) (*commonv1pb.InvokeResponse, error) {
	var response string

	switch in.Method {
	case "HelloFromGo":

		response = s.HelloFromGo()

Naturally, you also have to implement an HelloFromGo() method as well:

// HelloFromGo is a simple demo method to invoke
func (s *server) HelloFromGo() string {
	return "Hello"

}

Another service can use any language or framework and invoke the above method with a POST to the following URL if the Dapr id of the Go service is goserver:

http://localhost:3500/v1.0/invoke/goserver/method/HelloFromGo

A POST to the above URL tells Dapr to execute the OnInvoke method via GRPC which will run the HelloFromGo function. It is perfectly possible to include a payload in your POST and have the OnInvoke handler to process that payload. The full example is here which also includes sending and processing a JSON payload and sending back a text response. You will need to somewhat understand how GRPC works and also understand protocol buffers. A good book on GRPC is the following one: https://learning.oreilly.com/library/view/grpc-up-and/9781492058328/.

Conclusion

Dapr allows you to choose between HTTP and GRPC interfaces to interact with the runtime. You can choose whatever is most comfortable to you. One team can use HTTP with Python, JavaScript etc… while other teams use GRPC with their language of choice. Whatever you choose, the Dapr runtime will make sure service invocation just works allowing you to focus on the code.

%d bloggers like this: