From MQTT to InfluxDB with Dapr

In a previous post, we looked at using the Dapr InfluxDB component to write data to InfluxDB Cloud. In this post, we will take a look at reading data from an MQTT topic and storing it in InfluxDB. We will use Dapr 0.10, which includes both components.

To get up to speed with Dapr, please read the previous post and make sure you have an InfluxDB instance up and running in the cloud.

If you want to see a video instead:

MQTT to Influx with Dapr

Note that the video sends output to both InfluxDB and Azure SignalR. In addition, the video uses Dapr 0.8 with a custom compiled Dapr because I was still developing and testing the InfluxDB component.

MQTT Server

Although there are cloud-based MQTT servers you can use, let’s mix it up a little and run the MQTT server from Docker. If you have Docker installed, type the following:

docker run -it -p 1883:1883 -p 9001:9001 eclipse-mosquitto

The above command runs Mosquitto and exposes port 1883 on your local machine. You can use a tool such as MQTT Explorer to send data. Install MQTT Explorer on your local machine and run it. Create a connection like in the below screenshot:

MQTT Explorer connection

Now, click Connect to connect to Mosquitto. With MQTT, you send data to topics of your choice. Publish a json message to a topic called test as shown below:

Publish json data to the test topic

You can now click the topic in the list of topics and see its most recent value:

Subscribing to the test topic

Using MQTT with Dapr

You are now ready to read data from an MQTT topic with Dapr. If you have Dapr installed, you can run the following code to read from the test topic and store the data in InfluxDB:

const express = require('express');
const bodyParser = require('body-parser');

const app = express();
app.use(bodyParser.json());

const port = 3000;

// mqtt component will post messages from influx topic here
app.post('/mqtt', (req, res) => {
    console.log("MQTT Binding Trigger");
    console.log(req.body)

    // body is expected to contain room and temperature
    room = req.body.room
    temperature = req.body.temperature

    // room should not contain spaces
    room = room.split(" ").join("_")

    // create message for influx component
    message = {
        "measurement": "stat",
        "tags": `room=${room}`,
        "values": `temperature=${temperature}`
    };
    
    // send the message to influx output binding
    res.send({
        "to": ["influx"],
        "data": message
    });
});

app.listen(port, () => console.log(`Node App listening on port ${port}!`));

In this example, we use Node.js instead of Python to illustrate that Dapr works with any language. You will also need this package.json and run npm install:

{
  "name": "mqttapp",
  "version": "1.0.0",
  "description": "",
  "main": "app.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "author": "",
  "license": "ISC",
  "dependencies": {
    "body-parser": "^1.18.3",
    "express": "^4.16.4"
  }
}

In the previous post about InfluxDB, we used an output binding. You use an output binding by posting data to a Dapr HTTP URI.

To use an input binding like MQTT, you will need to create an HTTP server. Above, we create an HTTP server with Express, and listen on port 3000 for incoming requests. Later, we will instruct Dapr to listen for messages on an MQTT topic and, when a message arrives, post it to our server. We can then retrieve the message from the request body.

To tell Dapr what to do, we’ll create a components folder in the same folder that holds the Node.js code. Put a file in that folder with the following contents:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt
spec:
  type: bindings.mqtt
  metadata:
  - name: url
    value: mqtt://localhost:1883
  - name: topic
    value: test

Above, we configure the MQTT component to list to topic test on mqtt://localhost:1883. The name we use (in metadata) is important because that needs to correspond to our HTTP handler (/mqtt).

Like in the previous post, there’s another file that configures the InfluxDB component:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: influx
spec:
  type: bindings.influx
  metadata:
  - name: Url
    value: http://localhost:9999
  - name: Token
    value: ""
  - name: Org
    value: ""
  - name: Bucket
    value: ""

Replace the parameters in the file above with your own.

Saving the MQTT request body to InfluxDB

If you look at the Node.js code, you have probably noticed that we send a response body in the /mqtt handler:

res.send({
        "to": ["influx"],
        "data": message
    });

Dapr is written to accept responses that include a to and a data field in the JSON response. The above response simply tells Dapr to send the message in the data field to the configured influx component.

Does it work?

Let’s run the code with Dapr to see if it works:

dapr run --app-id mqqtinflux --app-port 3000 --components-path=./components node app.js

In dapr run, we also need to specify the port our app uses. Remember that Dapr will post JSON data to our /mqtt handler!

Let’s post some JSON with the expected fields of temperature and room to our MQTT server:

Posting data to the test topic

The Dapr logs show the following:

Logs from the APP (appear alongside the Dapr logs)

In InfluxDB Cloud table view:

Data stored in InfluxDB Cloud (posted some other data points before)

Conclusion

Dapr makes it really easy to retrieve data with input bindings and send that data somewhere else with output bindings. There are many other input and output bindings so make sure you check them out on GitHub!

Using the Dapr InfluxDB component

A while ago, I created a component that can write to InfluxDB 2.0 from Dapr. This component is now included in the 0.10 release. In this post, we will briefly look at how you can use it.

If you do not know what Dapr is, take a look at https://dapr.io. I also have some videos on Youtube about Dapr. And be sure to check out the video below as well:

Let’s jump in and use the component.

Installing Dapr

You can install Dapr on Windows, Mac and Linux by following the instructions on https://dapr.io/. Just click the Download link and select your operating system. I installed Dapr on WSL 2 (Windows Subsystem for Linux) on Windows 10 with the following command:

wget -q https://raw.githubusercontent.com/dapr/cli/master/install/install.sh -O - | /bin/bash

The above command just installs the Dapr CLI. To initialize Dapr, you need to run dapr init.

Getting an InfluxDB database

InfluxDB is a time-series database. You can easily run it in a container on your local machine but you can also use InfluxDB Cloud. In this post, we will simply use a free cloud instance. Just head over to https://cloud2.influxdata.com/signup and signup for an account. Just follow the steps and use a free account. It stores data for maximum 30 days and has some other limits as well.

You will need the following information to write data to InfluxDB:

  • Organization: this will be set to the e-mail account you signed up with; it can be renamed if you wish
  • Bucket: your data is stored in a bucket; by default you get a bucket called e-mail-prefix’s Bucket (e.g. geert.baeke’s Bucket)
  • Token: you need a token that provides the necessary access rights such as read and/or write

Let’s rename the bucket to get a feel for the user interface. Click Data, Buckets and then Settings as shown below:

Getting to the bucket settings

Click Rename and follow the steps to rename the bucket:

Renaming the bucket

Now, let’s create a token. In the Load Data screen, click Tokens. Click Generate and then click Read/Write Token. Describe the token and create it like below:

Creating a token

Now click the token you created and copy it to the clipboard. You now have the organization name, a bucket name and a token. You still need a URL to connect to but that just the URL you see in the browser (the yellow part):

URL to send your data

Your URL will depend on the cloud you use.

Python code to write to InfluxDB with Dapr

The code below requires Python 3. I used version 3.6.9 but it will work with more recent versions of course.

import time
import requests
import os

dapr_port = os.getenv("DAPR_HTTP_PORT", 3500)

dapr_url = "http://localhost:{}/v1.0/bindings/influx".format(dapr_port)
n = 0.0
while True:
    n += 1.0
    payload = { 
        "data": {
            "measurement": "temp",
            "tags": "room=dorm,building=building-a",
            "values": "sensor=\"sensor X\",avg={},max={}".format(n, n*2)
            }, 
        "operation": "create" 
    }
    print(payload, flush=True)
    try:
        response = requests.post(dapr_url, json=payload)
        print(response, flush=True)

    except Exception as e:
        print(e, flush=True)

    time.sleep(1)

The code above is just an illustration of using the InfluxDB output binding from Dapr. It is crucial to understand that a Dapr process needs to be running, either locally on your system or as a Kubernetes sidecar, that the above program communicates with. To that end, we get the Dapr port number from an environment variable or use the default port 3500.

The Python program uses the InfluxDB output binding simply by posting data to an HTTP endpoint. The endpoint is constructed as follows:

dapr_url = "http://localhost:{}/v1.0/bindings/influx".format(dapr_port)

The dapr_url above is set to a URI that uses localhost over the Dapr port and then uses the influx binding by appending /v1.0/bindings/influx. All bindings have a specific name like influx, mqtt, etc… and that name is then added to /v1.0/bindings/ to make the call work.

So far so good, but how does the binding know where to connect and what organization, bucket and token to use? That’s where the component .yaml file comes in. In the same folder where you save your Python code, create a folder called components. In the folder, create a file called influx.yaml (you can give it any name you want). The influx.yaml contents is shown below:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: influx
spec:
  type: bindings.influx
  metadata:
  - name: Url
    value: YOUR URL
  - name: Token
    value: "YOUR TOKEN HERE"
  - name: Org
    value: "YOUR ORG"
  - name: Bucket
    value: "YOUR BUCKET"

Of course, replace the uppercase values above with your own. We will later tell Dapr to look for files like this in the components folder. Automatically, because you use the influx binding in your Python code, Dapr will go look for the file above (type: bindings.influx) and retrieve the required metadata. If any of the metadata is not set or if the file is missing or improperly formatted, you will get an error.

To actually use the binding, we need to post some data to the URI we constructed. The data we send is in the payload variable as shown below:

 payload = { 
        "data": {
            "measurement": "temp",
            "tags": "room=dorm,building=building-a",
            "values": "sensor=\"sensor X\",avg={},max={}".format(n, n*2)
            }, 
        "operation": "create" 
    }

It requires a measurement field, a tags and a values field and uses the InfluxDB line protocol to send the data. You can find more information about that here.

The data field in the payload is specific to the Influx component. The operation field is required by this Dapr component as it is written to listen for create operations.

Running the code

On your local machine, you will need to run Dapr together with your code to make it work. You use dapr run for this. To run the Python code (saved to app.py in my case), run the command below from the folder that contains the code and the components folder:

dapr run --app-id influx -d ./components python3 app.py

This starts Dapr and our application with app id influx. With -d, we point to the components file.

When you run the code, Dapr logs and your logs will be printed to the screen. In InfluxDB Cloud, we can check the data from the user interface:

Date Explorer (Note: other organization and bucket than the one used in this post)

Conclusion

Dapr can be used in the cloud and at the edge, in containers or without. In both cases, you often have to write data to databases. With Dapr, you can now easily write data as time series to InfluxDB. Note that Dapr also has an MQTT input and output binding. Using the same simple technique you learned in this post, you can easily read data from an MQTT topic and forward it to InfluxDB. In a later post, we will take a look at that scenario as well. Or check this video instead: https://youtu.be/2vCT79KG24E. Note that the video uses a custom compiled Dapr 0.8 with the InfluxDB component because this video was created during development.

Dapr Service Invocation between an HTTP Python client and a GRPC Go server

Recently, I published several videos about Dapr on my Youtube channel. The videos cover the basics of state management, PubSub and service invocation.

The Getting Started with state management and service invocation:

Let’s take a closer look at service invocation with HTTP, Python and Node.

Service Invocation with HTTP

Service Invocation Diagram
Service invocation (image from Dapr docs)

The services you write (here Service A and B) talk to each other using the Dapr runtime. On Kubernetes, you talk to a Dapr sidecar deployed alongside your service container. On your development machine, you run your services via dapr run.

If you want to expose a method on Service B and you use HTTP, you just need to expose an HTTP handler or route. For example, with Express in Node you would use something like:

const express = require('express');
const app = express();
app.post('/neworder', (req, res) => { your code }

You then run your service and annotate it with the proper Dapr annotations (Kubernetes):

annotations:
        dapr.io/enabled: "true"
        dapr.io/id: "node"
        dapr.io/port: "3000"

On your local machine, you would just run the service via dapr run:

dapr run --app-id node --app-port 3000 node app.js

In the last example, the Dapr id is node and we indicate that the service is listening on port 3000. To invoke the method from service A, it can use the following code (Python example shown):

dapr_port = os.getenv("DAPR_HTTP_PORT", 3500)
dapr_url = "http://localhost:{}/v1.0/invoke/node/method/neworder".format(dapr_port)
message = {"data": {"orderId": 1234}}
response = requests.post(dapr_url, json=message, timeout=5)

As you can see, service A does not contact service B directly. It just talks to its Dapr sidecar on localhost (or Dapr on your dev machine) and asks it to invoke the neworder method via a service that uses Dapr id node. It is also clear that both service A and B use HTTP only. Because you just use HTTP to expose and invoke methods, you can use any language or framework.

You can find a complete example here with Node and Python.

Service Invocation with HTTP and GRPC

Dapr has SDKs available for C#, Go and other languages. You might prefer those over the generic HTTP approach. In the case of Go, the SDK uses GRPC to interface with the Dapr runtime. With Dapr in between, one service can use HTTP while another uses GRPC.

Let’s take a look at a service that exposes a method (HelloFromGo) from a Go application. The full example is here. Instead of creating an HTTP route with the name of your method, you use an OnInvoke handler that looks like this (only the start is shown, see the full code):

func (s *server) OnInvoke(ctx context.Context, in *commonv1pb.InvokeRequest) (*commonv1pb.InvokeResponse, error) {
	var response string

	switch in.Method {
	case "HelloFromGo":

		response = s.HelloFromGo()

Naturally, you also have to implement an HelloFromGo() method as well:

// HelloFromGo is a simple demo method to invoke
func (s *server) HelloFromGo() string {
	return "Hello"

}

Another service can use any language or framework and invoke the above method with a POST to the following URL if the Dapr id of the Go service is goserver:

http://localhost:3500/v1.0/invoke/goserver/method/HelloFromGo

A POST to the above URL tells Dapr to execute the OnInvoke method via GRPC which will run the HelloFromGo function. It is perfectly possible to include a payload in your POST and have the OnInvoke handler to process that payload. The full example is here which also includes sending and processing a JSON payload and sending back a text response. You will need to somewhat understand how GRPC works and also understand protocol buffers. A good book on GRPC is the following one: https://learning.oreilly.com/library/view/grpc-up-and/9781492058328/.

Conclusion

Dapr allows you to choose between HTTP and GRPC interfaces to interact with the runtime. You can choose whatever is most comfortable to you. One team can use HTTP with Python, JavaScript etc… while other teams use GRPC with their language of choice. Whatever you choose, the Dapr runtime will make sure service invocation just works allowing you to focus on the code.

Trying Consul Connect on your local machine

In a previous post, I talked about installing Consul on Kubernetes and using some of its features. In that post, I did not look at the service mesh functionality. Before looking at that, it is beneficial to try out the service mesh features on your local machine.

You can easily install Consul on your local machine with Chocolatey for Windows or Homebrew for Mac. On Windows, a simple choco install consul is enough. Since Consul is just a single executable, you can start it from the command line with all the options you need.

In the video below, I walk through configuring two services running as containers on my local machine: a web app that talks to Redis. We will “mesh” both services and then use an intention to deny service-to-service traffic.

Consul Service Mesh on your local machine… speed it up! ☺

In a later post and video, we will look at Consul Connect on Kubernetes. Stay tuned!

Getting started with Consul on Kubernetes

Although I have heard a lot about Hashicorp’s Consul, I have not had the opportunity to work with it and get acquainted with the basics. In this post, I will share some of the basics I have learned, hopefully giving you a bit of a head start when you embark on this journey yourself.

Want to watch a video about this instead?

What is Consul?

Basically, Consul is a networking tool. It provides service discovery and allows you to store and retrieve configuration values. On top of that, it provides service-mesh capability by controlling and encrypting service-to-service traffic. Although that looks simple enough, in complex and dynamic infrastructure spanning multiple locations such as on-premises and cloud, this can become extremely complicated. Let’s stick to the basics and focus on three things:

  • Installation on Kubernetes
  • Using the key-value store for configuration
  • Using the service catalog to retrieve service information

We will use a small Go program to illustrate the use of the Consul API. Let’s get started… 🚀🚀🚀

Installation of Consul

I will install Consul using the provided Helm chart. Note that the installation I will perform is great for testing but should not be used for production. In production, there are many more things to think about. Look at the configuration values for hints: certificates, storage size and class, options to enable/disable, etc… That being said, the chart does install multiple servers and clients to provide high availability.

I installed Consul with Pulumi and Python. You can check the code here. You can use that code on Azure to deploy both Kubernetes and Consul in one step. The section in the code that installs Consul is shown below:

consul = v3.Chart("consul",
    config=v3.LocalChartOpts(
        path="consul-chart",
        namespace="consul",
        values={ 
            "connectInject": {
                "enabled": "true"
            },
            "client": {
                "enabled": "true",
                "grpc": "enabled"
            },
            "syncCatalog": {
                "enabled": "true"
            } 
        }        
    ),
    opts=pulumi.ResourceOptions(
        depends_on=[ns_consul],
        provider=k8s
    )    
)

The code above would be equivalent to this Helm chart installation (Helm v3):

helm install consul -f consul-helm/values.yaml \
--namespace consul ./consul-helm \
--set connectInject.enabled=true  \
--set client.enabled=true --set client.grpc=true  \
--set syncCatalog.enabled=true

Connecting to the Consul UI

The chart installs Consul in the consul namespace. You can run the following command to get to the UI:

kubectl port-forward services/consul-consul-ui 8888:80 -n consul8:80 -n consul

You will see the screen below. The list of services depends on the Kubernetes services in your system.

Consul UI with list of services

The services above include consul itself. The consul service also has health checks configured. The other services in the screenshot are Kubernetes services that were discovered by Consul. I have installed Redis in the default namespace and exposed Redis via a service called redisapp. This results in a Consul service called redisadd-default. Later, we will query this service from our Go application.

When you click Key/Value, you can see the configured keys. I have created one key called REDISPATTERN which is later used in the Go program to know the Redis channels to subscribe to. It’s just a configuration value that is retrieved at runtime.

A simple key/value pair: REDISPATTERn=*

The Key/Value pair can be created via the consul CLI, the HTTP API or via the UI (Create button in the main Key/Value screen). I created the REDISPATTERN key via the Create button.

Querying the Key/Value store

Let’s turn our attention to writing some code that retrieves a Consul key at runtime. The question of course is: “how does your application find Consul?”. Look at the diagram below:

Simplifgied diagram of Consul installation on Kubernetes via the Helm chart

Above, you see the Consul server agents, implemented as a Kubernetes StatefulSet. Each server pod has a volume (Azure disk in this case) to store data such as key/value pairs.

Your application will not connect to these servers directly. Instead, it will connect via the client agents. The client agents are implemented as a DaemonSet resulting in a client agent per Kubernetes node. The client agent pods expose a static port on the Kubernetes host (yes, you read that right). This means that your app can connect to the IP address of the host it is running on. Your app can discover that IP address via the Downward API.

The container spec contains the following code:

      containers:
      - name: realtimeapp
        image: gbaeke/realtime-go-consul:1.0.0
        env:
        - name: HOST_IP
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.hostIP
        - name: CONSUL_HTTP_ADDR
          value: http://$(HOST_IP):8500

The HOST_IP will be set to the IP of the Kubernetes host via a reference to status.hostIP. Next, the environment variable CONSUL_HTTP_ADDR is set to the full HTTP address including port 8500. In your code, you will need to read that environment variable.

Retrieving a key/value pair

Here is some code to read a Consul key/value pair with Go. Full source code is here.

// return a Consul client based on given address
func getConsul(address string) (*consulapi.Client, error) {
	config := consulapi.DefaultConfig()
	config.Address = address
	consul, err := consulapi.NewClient(config)
	return consul, err
}

// get key/value pair from Consul client and passed key name
func getKvPair(client *consulapi.Client, key string) (*consulapi.KVPair, error) {
	kv := client.KV()
	keyPair, _, err := kv.Get(key, nil)
	return keyPair, err
}

func main() {
        // retrieve address of Consul set via downward API in spec
	consulAddress := getEnv("CONSUL_HTTP_ADDR", "")
	if consulAddress == "" {
		log.Fatalf("CONSUL_HTTP_ADDRESS environment variable not set")
	}

        // get Consul client
	consul, err := getConsul(consulAddress)
	if err != nil {
		log.Fatalf("Error connecting to Consul: %s", err)
	}

        // get key/value pair with Consul client
	redisPattern, err := getKvPair(consul, "REDISPATTERN")
	if err != nil || redisPattern == nil {
		log.Fatalf("Could not get REDISPATTERN: %s", err)
	}
	log.Printf("KV: %v %s\n", redisPattern.Key, redisPattern.Value)

... func main() continued...

The comments in the code should be self-explanatory. When the REDISPATTERN key is not set or another error occurs, the program will exit. If REDISPATTERN is set, we can use the value later:

pubsub := client.PSubscribe(string(redisPattern.Value))

Looking up a service

That’s great but how do you look up an address of a service? That’s easy with the following basic code via the catalog:

cat := consul.Catalog()
svc, _, err := cat.Service("redisapp-default", "", nil)
log.Printf("Service address and port: %s:%d\n", svc[0].ServiceAddress, 
  svc[0].ServicePort)

consul is a *consulapi.client obtained earlier. You use the Catalog() function to obtain access to catalog service functionality. In this case, we simply retrieve the address and port value of the Kubernetes service redisapp in the default namespace. We can use that information to connect to our Redis back-end.

Conclusion

It’s easy to get started with Consul on Kubernetes and to write some code to take advantage of it. Be aware though that we only scratched the surface here and that this is both a sample deployment (without TLS, RBAC, etc…) and some sample code. In addition, you should only use Consul in more complex application landscapes with many services to discover, traffic to secure and more. If you do think you need it, you should also take a look at managed Consul on Azure. It runs in your subscription but fully managed by Hashicorp! It can be integrated with Azure Kubernetes Service as well.

In a later post, I will take a look at the service mesh capabilities with Connect.

Progressive Delivery on Kubernetes: what are your options?

If you have ever deployed an application to Kubernetes, even a simple one, you are probably familiar with deployments. A deployment describes the pods to run, how many of them to run and how they should be upgraded. That last point is especially important because the strategy you select has an impact on the availability of the deployment. A deployment supports the following two strategies:

  • Recreate: all existing pods are killed and new ones are created; this obviously leads to some downtime
  • RollingUpdate: pods are gradually replaced which means there is a period when old and new pods coexist; this can result in issues for stateful pods or if there is no backward compatibility

But what if you want to use other methods such as BlueGreen or Canary? Although you could do that with a custom approach that uses deployments, there are some solution that provide a more automated approach. Below, I discuss two of them briefly. Videos provide a more in depth look.

Argo Rollouts

One of the solutions out there is Argo Rollouts. It is very easy to use. If you want to start slowly, with BlueGreen deployments and manual approval for instance, Argo Rollouts is recommended. It has a nice kubectl plugin and integration with Argo CD, a GitOps solution.

The following video demonstrates BlueGreen deployments:

BlueGreen deployments with Argo Rollouts

This video discusses a canary deployment with Argo Rollouts albeit a simple one without metric analysis:

Canary deployments with Argo Rollouts

This video shows the integration between Argo Rollouts and Argo CD:

Argo CD and Argo Rollouts integration

One thing to note is that, instead of a deployment, you will create a rollout object. It is easy to convert an existing deployment into a rollout. Other tools such as Flagger (see below), provide their functionality on top of an existing deployment.

For traffic splitting and metrics analysis, Argo Rollouts does not support Linkerd. More information about traffic splitting and management can be found here.

Flagger

Flagger, by Weaveworks, is another solution that provides BlueGreen and Canary deployment support to Kubernetes. In the video below, I demonstrate the basic look and feel of doing a canary deployment that includes metric analysis. Linkerd is used for gradual traffic shifting to the canary based on the built-in success rate metric of Linkerd:

Canary release with Flagger and Linkerd

If you want to get started with canary releases and easy traffic splitting and metrics, I suggest using the Flagger and Linkerd combination. This is based simply on the fact that Linkerd is much easier to install and use than Istio. Argo Rollouts in combination with Istio and Prometheus could be used to achieve exactly the same result.

Which one to use?

If you just want BlueGreen deployments with manual approvals, I would suggest using Argo Rollouts. When you integrate it with Argo CD, you can even use the Argo CD UI to promote your deployment. If you are comfortable with Istio and Prometheus, you can go a step further and add metrics analysis to automatically progress your deployment. You can also use a simple Kubernetes job to validate your deployment. Also, note that other metrics providers are supported.

Flagger supports more options for traffic splitting and metrics, due to its support for both Linkerd and Istio. Because Linkerd is so easy to use, Flagger is simpler to get started with canary releases and metrics analysis.

GitOps with Kubernetes: a better way to deploy?

I recently gave a talk at TechTrain, a monthly event in Mechelen (Belgium), hosted by Cronos. The talk is called “GitOps with Kubernetes: a better way to deploy” and is an introduction to GitOps with Weaveworks Flux as an example.

You can find a re-recording of the presentation on Youtube:

Writing a Kubernetes operator with Kopf

In today’s post, we will write a simple operator with Kopf, which is a Python framework created by Zalando. A Kubernetes operator is a piece of software, running in Kubernetes, that does something application specific. To see some examples of what operators are used for, check out operatorhub.io.

Our operator will do something simple in order to easily grasp how it works:

  • the operator will create a deployment that runs nginx
  • nginx will serve a static website based on a git repository that you specify; we will use an init container to grab the website from git and store it in a volume
  • you can control the number of instances via a replicas parameter

That’s great but how will the operator know when it has to do something, like creating or updating resources? We will use custom resources for that. Read on to learn more…

Note: source files are on GitHub

Custom Resource Definition (CRD)

Kubernetes allows you to define your own resources. We will create a resource of type (kind) DemoWeb. The CRD is created with the YAML below:

# A simple CRD to deploy a demo website from a git repo
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: demowebs.baeke.info
spec:
  scope: Namespaced
  group: baeke.info
  versions:
    - name: v1
      served: true
      storage: true
  names:
    kind: DemoWeb
    plural: demowebs
    singular: demoweb
    shortNames:
      - dweb
  additionalPrinterColumns:
    - name: Replicas
      type: string
      priority: 0
      JSONPath: .spec.replicas
      description: Amount of replicas
    - name: GitRepo
      type: string
      priority: 0
      JSONPath: .spec.gitrepo
      description: Git repository with web content

For more information (and there is a lot) about CRDs, see the documentation.

Once you create the above resource with kubectl apply (or create), you can create a custom resource based on the definition:

apiVersion: baeke.info/v1
kind: DemoWeb
metadata:
  name: demoweb1
spec:
  replicas: 2
  gitrepo: "https://github.com/gbaeke/static-web.git"

Note that we specified our own API and version in the CRD (baeke.info/v1) and that we set the kind to DemoWeb. In the additionalPrinterColumns, we defined some properties that can be set in the spec that will also be printed on screen. When you list resources of kind DemoWeb, you will the see replicas and gitrepo columns:

Custom resources based on the DemoWeb CRD

Of course, creating the CRD and the custom resources is not enough. To actually create the nginx deployment when the custom resource is created, we need to write and run the operator.

Writing the operator

I wrote the operator on a Mac with Python 3.7.6 (64-bit). On Windows, for best results, make sure you use Miniconda instead of Python from the Windows Store. First install Kopf and the Kubernetes package:

pip3 install kopf kubernetes

Verify you can run kopf:

Running kopf

Let’s write the operator. You can find it in full here. Here’s the first part:

Naturally, we import kopf and other necessary packages. As noted before, kopf and kubernetes will have to be installed with pip. Next, we define a handler that runs whenever a resource of our custom type is spotted by the operator (with the @kopf.on.create decorator). The handler has two parameters:

  • spec object: allows us to retrieve our custom properties with spec.get (e.g. spec.get(‘replicas’, 1) – the second parameter is the default value)
  • **kwargs: a dictionary with lots of extra values we can use; we use it to retrieve the name of our custom resource (e.g. demoweb1); we can use that name to derive the name of our deployment and to set labels for our pods

Note: instead of using **kwargs to retrieve the name, you can also define an extra name parameter in the handler like so: def create_fn(spec, name, **kwargs); see the docs for more information

Our deployment is just yaml stored in the doc variable with some help from the Python yaml package. We use spec.get and the name variable to customise it.

After the doc variable, the following code completes the event handler:

The rest of the operator

With kopf.adopt, we make sure the deployment we create is a child of our custom resource. When we delete the custom resource, its children are also deleted.

Next, we simply use the kubernetes client to create a deployment via the apps/v1 api. The method create_namespaced_deployment takes two required parameters: the namespace and the deployment specification. Note there is only minimal error checking here. There is much more you can do with regards to error checking, retries, etc…

Now we can run the operator with:

kopf run operator-filename.py

You can perfectly run this on your local workstation if you have a working kube config pointing at a running cluster with the CRD installed. Kopf will automatically use that for authentication:

Running the operator on your workstation

Running the operator in your cluster

To run the operator in your cluster, create a Dockerfile that produces an image with Python, kopf, kubernetes and your operator in Python. In my case:

FROM python:3.7
RUN mkdir /src
ADD with_create.py /src
RUN pip install kopf
RUN pip install kubernetes
CMD kopf run /src/with_create.py --verbose

We added the verbose parameter for extra logging. Next, run the following commands to build and push the image (example with my image name):

docker build -t gbaeke/kopf-demoweb .
docker push gbaeke/kopf-demoweb

Now you can deploy the operator to the cluster:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: demowebs-operator
spec:
  replicas: 1
  strategy:
    type: Recreate
  selector:
    matchLabels:
      application: demowebs-operator
  template:
    metadata:
      labels:
        application: demowebs-operator
    spec:
      serviceAccountName: demowebs-account
      containers:
      - name: demowebs
        image: gbaeke/kopf-demoweb

The above is just a regular deployment but the serviceAccountName is extremely important. It gives kopf and your operator the required access rights to create the deployment is the target namespace. Check out the documentation to find out more about the creation of the service account and the required roles. Note that you should only run one instance of the operator!

Once the operator is deployed, you will see it running as a normal pod:

The operator is running

To see what is going on, check the logs. Let’s show them with octant:

Your operator logs

At the bottom, you see what happens when a creation event is detected for a resource of type DemoWeb. The spec is shown with the git repository and the number on replicas.

Now you can create resources of kind DemoWeb and see what happens. If you have your own git repository with some HTML in it, try to use that. Otherwise, just use mine at https://github.com/gbaeke/static-web.

Conclusion

Writing an operator is easy to do with the Kopf framework. Do note that we only touched on the basics to get started. We only have an on.create handler, and no on.update handler. So if you want to increase the number of replicas, you will have to delete the custom resource and create a new one. Based on the example though, it should be pretty easy to fix that. The git repo contains an example of an operator that also implements the on.update handler (with_update.py).

Giving Argo CD a spin

If you have followed my blog a little, you have seen a few posts about GitOps with Flux CD. This time, I am taking a look at Argo CD which, like Flux CD, is a GitOps tool to deploy applications from manifests in a git repository.

Don’t want to read this whole thing?

Here’s the video version of this post

There are several differences between the two tools:

  • At first glance, Flux appears to use a single git repo for your cluster where Argo immediately introduces the concept of apps. Each app can be connected to a different git repo. However Flux can also use multiple git repositories in the same cluster. See https://github.com/fluxcd/multi-tenancy for more information
  • Flux has the concept of workloads which can be automated. This means that image repositories are scanned for updates. When an update is available (say from tag v1.0.0 to v1.0.1), Flux will update your application based on filters you specify. As far as I can see, Argo requires you to drive the update from your CI process, which might be preferred.
  • By default, Argo deploys an administrative UI (next to a CLI) with a full view on your deployment and its dependencies
  • Argo supports RBAC and integrates with external identity providers (e.g. Azure Active Directory)

The Argo CD admin interface is shown below:

Argo CD admin interface… not too shabby

Let’s take a look at how to deploy Argo and deploy the app you see above. The app is deployed using a single yaml file. Nothing fancy yet such as kustomize or jsonnet.

Deployment

The getting started guide is pretty clear, so do have a look over there as well. To install, just run (with a deployed Kubernetes cluster and kubectl pointing at the cluster):

kubectl create namespace argocd 

kubectl apply -n argocd -f https://raw.githubusercontent.com/argoproj/argo-cd/stable/manifests/install.yaml

Note that I installed Argo CD on Azure (AKS).

Next, install the CLI. On a Mac, that is simple (with Homebrew):

brew tap argoproj/tap

brew install argoproj/tap/argocd

You will need access to the API server, which is not exposed over the Internet by default. For testing, port forwarding is easiest. In a separate shell, run the following command:

kubectl port-forward svc/argocd-server -n argocd 8080:443

You can now connect to https://localhost:8080 to get to the UI. You will need the admin password by running:

kubectl get pods -n argocd -l app.kubernetes.io/name=argocd-server -o name | cut -d'/' -f 2

You can now login to the UI with the user admin and the displayed password. You should also login from the CLI and change the password with the following commands:

argocd login localhost:8080

argocd account update-password

Great! You are all set now to deploy an application.

Deploying an application

We will deploy an application that has a couple of dependencies. Normally, you would install those dependencies with Argo CD as well but since I am using a cluster that has these dependencies installed via Azure DevOps, I will just list what you need (Helm commands):

helm upgrade --namespace kube-system --install --set controller.service.loadBalancerIP=<IPADDRESS>,controller.publishService.enabled=true --wait nginx stable/nginx-ingress 

helm upgrade --namespace kube-system --install --values /home/vsts/work/1/s/externaldns/values.yaml --set cloudflare.apiToken=<CF_SECRET> --wait externaldns stable/external-dns

kubectl create ns cert-manager

helm upgrade --namespace cert-manager --install --wait --version v0.12.0 cert-manager jetstack/cert-manager

To know more about these dependencies and use an Azure DevOps YAML pipeline to deploy them, see this post. If you want, you can skip the externaldns installation and create a DNS record yourself that resolves to the public IP address of Nginx Ingress. If you do not want to use an Azure static IP address, you can remove the loadBalancerIP parameter from the first command.

The manifests we will deploy with Argo CD can be found in the following public git repository: https://github.com/gbaeke/argo-demo. The application is in three YAML files:

  • Two YAML files that create a certificate cluster issuer based on custom resource definitions (CRDs) from cert-manager
  • realtime.yaml: Redis deployment, Redis service (ClusterIP), realtime web app deployment (based on this), realtime web app service (ClusterIP), ingress resource for https://real.baeke.info (record automatically created by externaldns)

It’s best that you fork my repo and modify realtime.yaml’s ingress resource with your own DNS name.

Create the Argo app

Now you can create the Argo app based on my forked repo. I used the following command with my original repo:

argocd app create realtime \   
--repo https://github.com/gbaeke/argo-demo.git \
--path manifests \
--dest-server https://kubernetes.default.svc \
--dest-namespace default

The command above creates an app called realtime based on the specified repo. The app should use the manifests folder and apply (kubectl apply) all the manifests in that folder. The manifests are deployed to the cluster that Argo CD runs in. Note that you can run Argo CD in one cluster and deploy to totally different clusters.

The above command does not configure the repository to be synced automatically, although that is an option. To sync manually, use the following command:

argocd app sync realtime

The application should now be synced and viewable in the UI:

Application installed and synced

In my case, this results in the following application at https://real.baeke.info:

Not Secure because we use Let’s Encrypt staging for this app

Set up auto-sync

Let’s set up this app to automatically sync with the repo (default = every 3 minutes). This can be done from both the CLI and the UI. Let’s do it from the UI. Click on the app and then click App Details. You will find a Sync Policy in the app details where you can enable auto-sync

Setting up auto-sync from the UI

You can now make changes to the git repo like changing the image tag for gbaeke/fluxapp (yes, I used this image with the Flux posts as well 😊 ) to 1.0.6 and wait for the sync to happen. Or sync manually from the CLI or the UI.

Conclusion

This was a quick tour of Argo CD. There is much more you can do but the above should get you started quickly. I must say I quite like the solution and am eager to see what the collaboration of Flux CD, Argo CD and Amazon comes up with in the future.

Kustomize and Flux

Flux has a feature called manifest generation that works together with Kustomize. Instead of just picking YAML files from a git repo and applying them, customisation is performed with the kustomize build command. The resulting YAML then gets applied to your cluster.

If you don’t know how customisation works (without Flux), take a look at the article I wrote earlier. Or look at the core docs.

You need to be aware of a few things before you get started. In order for Flux to use this method, you need to turn on manifest generation. With the Flux Helm chart, just pass the following parameter:

--set manifestGeneration=true

In my case, I have plain YAML files without customisation in a config folder. I want the files that use customisation in a different folder, say kustomize, like so:

Two folders to pass as git.path

To pass these folders to the Helm chart, use the following parameter:

--set git.path="config\,kustomize"

The kustomize folder contains the following files:

base files with environments dev and prod

There is nothing special about the base folder here. It is as explained in my previous post. The dev and prod folders are similar so I will focus only on dev.

The dev folder contains a .flux.yaml file, which is required by Flux. In this simple example, it contains the following:

version: 1
patchUpdated:
  generators:
    - command: kustomize build .
  patchFile: flux-patch.yaml

The file specifies the generator to use, in this case Kustomize. The kustomize executable is in the Flux image. I specify one patchFile which contains patches for several resources separated by —:

---
apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    flux.weave.works/automated: "true"
    flux.weave.works/tag.realtime: semver:~1
  name: realtime
  namespace: realtime-dev
spec:
  template:
    spec:
      $setElementOrder/containers:
      - name: realtime
      containers:
      - image: gbaeke/fluxapp:1.0.6
        name: realtime
---
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: realtime-ingress
  namespace: realtime-dev
spec:
  rules:
  - host: realdev.baeke.info
    http:
      paths:
      - backend:
          serviceName: realtime
          servicePort: 80
        path: /
  tls:
  - hosts:
    - realdev.baeke.info
    secretName: real-dev-baeke-info-tls

Above, you see the patches for the dev environment:

  • the workload should be automated by Flux, installing new images based on the semantic version filter ~1
  • the ingress should use host realdev.baeke.info with a different name for the secret as well (the secret will be created by cert-manager)

The prod folder contains a similar configuration. Perhaps naively, I thought that specifying the kustomize folder in git.path was sufficient for Flux to scan the folders and run customisation wherever a .flux.yaml file was found. Sadly, that is not the case. ☹️With just the kustomization folder specified, Flux find conflicts between base, dev and prod folders because they contain similar files. That is expected behaviour for regular YAML files but , in my opinion, should not happen in this case. There is a bit of a clunky way to make this work though. Just specify the following as git.path:

--set git.path="config\,kustomize/dev\,kustomize/prod"

With the above parameter, Flux will find no conflicts and will happily apply the customisations.

As a side note, you should also specify the namespace in the patch file explicitly. It is not added automatically even though kustomization.yaml contains the namespace.

Let’s look at the cluster when Flux has applied the changes.

Namespaces for dev and prod created via Flux & Kustomize

And here is the deployed “production app”:

Who chose that ugly colour!

The way customisations are handled could be improved. It’s unwieldy to specify every “customisation” folder in the git.path parameter. Just give me a –git-kustomize-path parameter and scan the paths in that parameter for .flux.yaml files. On the other hand, maybe I am missing something here so remarks are welcome.