Enhancing Semantic Search with a Streamlit UI

In a previous blog post, we discussed two Python programs, upload_vectors.py and search_vectors.py. These programs were used to create and search vectors, respectively. The upload_vectors.py script created vectors from chunks of a larger text and stored them in Pinecone, while the search_vectors.py script enabled semantic search on the text. In this blog post, we will discuss how to create a user interface (UI) for these two programs using Streamlit.

🚀 I kickstarted the Streamlit app by handing over the text-based version to ChatGPT and asking it to work its magic ✨💻. Yes, it was that easy! Afterwards, I made several manual changes to make it look the way I wanted.

Pinecone, Vectors, Embeddings, and Semantic Search: What’s all that about?

Pinecone is a vector database service that allows for easy storage and retrieval of high-dimensional vectors. It is optimized for similarity search, which makes it a perfect fit for tasks like semantic search. Our script stores vectors in Pinecone by parsing an RSS feed, chunking the blog posts, and creating the vectors with OpenAI’s embedding APIs.

Vectors are mathematical representations of data in the form of an array of numbers. In our case, we use vectors to represent chunks of text retrieved from blog posts. These vectors are generated using a process called embedding, which is a way of representing complex data, like text, in a lower-dimensional space while preserving the essential information.

Semantic search is a type of search that goes beyond keyword matching to understand the meaning and context of the query. By using vector embeddings, we can compare the similarity between queries and stored texts to find the most relevant results. Pinecone does that search for us and simply returns a number of matching chunks (pieces of text).

What is Streamlit?

Streamlit is a Python library that makes it easy to create custom web apps for machine learning and data science projects. You can build interactive UIs with minimal code, allowing you to focus on the core logic of your application.

Here’s an example of creating an extremely simple Streamlit app:

import streamlit as st

st.title('Hello, Streamlit!')
st.write('This is a simple Streamlit app.')

This code would generate a web app with a title and a text output. You can also create more complex UIs with user input, like sliders, text inputs, and buttons.

Creating a Streamlit UI for Semantic Search

Now let’s examine the provided code for creating a Streamlit UI for the search_vectors.py program. The code can be broken down into the following sections:

  1. Import necessary libraries and check environment variables.
  2. Set up the tokenizer and define the tiktoken_len function.
  3. Create the UI elements, including the title, text input, dropdown, sliders, and buttons.
  4. Define the main search functionality that is triggered when the user clicks the “Search” button.

Here is the full code:

import os
import pinecone
import openai
import tiktoken
import streamlit as st

# check environment variables
if os.getenv('PINECONE_API_KEY') is None:
    st.error("PINECONE_API_KEY not set. Please set this environment variable and restart the app.")
if os.getenv('PINECONE_ENVIRONMENT') is None:
    st.error("PINECONE_ENVIRONMENT not set. Please set this environment variable and restart the app.")
if os.getenv('OPENAI_API_KEY') is None:
    st.error("OPENAI_API_KEY not set. Please set this environment variable and restart the app.")

# use cl100k_base tokenizer for gpt-3.5-turbo and gpt-4
tokenizer = tiktoken.get_encoding('cl100k_base')


def tiktoken_len(text):
    tokens = tokenizer.encode(
        text,
        disallowed_special=()
    )
    return len(tokens)

# create a title for the app
st.title("Search blog feed 🔎")

# create a text input for the user query
your_query = st.text_input("What would you like to know?")
model = st.selectbox("Model", ["gpt-3.5-turbo", "gpt-4"])

with st.expander("Options"):

    max_chunks = 5
    if model == "gpt-4":
        max_chunks = 15

    max_reply_tokens = 1250
    if model == "gpt-4":
        max_reply_tokens = 2000

    col1, col2 = st.columns(2)

    # model dropdown
    with col1:
        chunks = st.slider("Number of chunks", 1, max_chunks, 5)
        temperature = st.slider("Temperature", 0.0, 1.0, 0.0)

    with col2:
        reply_tokens = st.slider("Reply tokens", 750, max_reply_tokens, 750)
    

# create a submit button
if st.button("Search"):
    # get the Pinecone API key and environment
    pinecone_api = os.getenv('PINECONE_API_KEY')
    pinecone_env = os.getenv('PINECONE_ENVIRONMENT')

    pinecone.init(api_key=pinecone_api, environment=pinecone_env)

    # set index
    index = pinecone.Index('blog-index')


    # vectorize your query with openai
    try:
        query_vector = openai.Embedding.create(
            input=your_query,
            model="text-embedding-ada-002"
        )["data"][0]["embedding"]
    except Exception as e:
        st.error(f"Error calling OpenAI Embedding API: {e}")
        st.stop()

    # search for the most similar vector in Pinecone
    search_response = index.query(
        top_k=chunks,
        vector=query_vector,
        include_metadata=True)

    # create a list of urls from search_response['matches']['metadata']['url']
    urls = [item["metadata"]['url'] for item in search_response['matches']]

    # make urls unique
    urls = list(set(urls))

    # create a list of texts from search_response['matches']['metadata']['text']
    chunk_texts = [item["metadata"]['text'] for item in search_response['matches']]

    # combine texts into one string to insert in prompt
    all_chunks = "\n".join(chunk_texts)

    # show urls of the chunks
    with st.expander("URLs", expanded=True):
        for url in urls:
            st.markdown(f"* {url}")
    

    with st.expander("Chunks"):
        for i, t in enumerate(chunk_texts):
            # remove newlines from chunk
            tokens = tiktoken_len(t)
            t = t.replace("\n", " ")
            st.write("Chunk ", i, "(Tokens: ", tokens, ") - ", t[:50] + "...")
    with st.spinner("Summarizing..."):
        try:
            prompt = f"""Answer the following query based on the context below ---: {your_query}
                                                        Do not answer beyond this context!
                                                        ---
                                                        {all_chunks}"""


            # openai chatgpt with article as context
            # chat api is cheaper than gpt: 0.002 / 1000 tokens
            response = openai.ChatCompletion.create(
                model=model,
                messages=[
                    { "role": "system", "content":  "You are a truthful assistant!" },
                    { "role": "user", "content": prompt }
                ],
                temperature=temperature,
                max_tokens=max_reply_tokens
            )

            st.markdown("### Answer:")
            st.write(response.choices[0]['message']['content'])

            with st.expander("More information"):
                st.write("Query: ", your_query)
                st.write("Full Response: ", response)

            with st.expander("Full Prompt"):
                st.write(prompt)

            st.balloons()
        except Exception as e:
            st.error(f"Error with OpenAI Completion: {e}")

A closer look

The code first imports the necessary libraries and checks if the required environment variables are set, displaying an error message if they are not. The libraries you need are in requirements.txt on GitHub. You can install them with:

pip3 install -r requirements.txt

ℹ️ I recommend using a Python virtual environment when you install these dependencies; see poetry (just one example)

The tiktoken_len function calculates the token length of a given text using the tokenizer. This is used to display the tokens of each chunk of text we set to the ChatCompletion API. Depending on the model, 4096 or 8192 tokens are supported.

The UI is built using Streamlit functions, such as st.title, st.text_input, st.selectbox, and st.columns. These functions create various UI elements that the user can interact with to input their query and set search parameters. If you look at the code, you will see how easy it is to add those elements.

With the UI elements, you can set:

  • the number of text chunks to return from Pinecone and to forward to the ChatCompletion API (using st.slider)
  • the number of tokens to reply with (using st.slider)
  • the model: gpt-3.5-turbo or gpt-4 (ensure you have access to the gpt-4 API)
  • the temperature (using st-slider)

The options are shown in two columns with st.columns.

The main search functionality is triggered when the user clicks the “Search” button. The code then vectorizes the query, searches for the most similar vectors in Pinecone, and displays the URLs and chunks found. Finally, the selected model is used to generate an answer based on the chunks found and the user’s query. Often, gpt-4 will provide the best answer. It seems to be able to better understand all the chunks of text thrown at it.

Running the code

To run the code you need the following:

  • A Pinecode API key and environment
  • An OpenAI API key

It is easiest to run the code with Docker. If you have it installed, run the following command:

docker run -p 8501:8501 -e OPENAI_API_KEY="YOURKEY" \
    -e PINECONE_API_KEY="YOURKEY" \
    -e PINECONE_ENVIRONMENT="YOURENV" gbaeke/blogsearch

The gbaeke/blogsearch image is available on Docker Hub. You can also build your own with the Dockerfile provided on GitHub.

After running the image, go to http://localhost:8501 and first use the Upload page to create your Pinecode index and store vectors in it. You can use my blog’s feed or any other feed. You can experiment with the chunk size and chunk overlap.

Upload to Pinecone

You can add multiple RSS feeds one-by-one as long as you turn off Recreate index before each new upload. After you have populated the index, use the Search page to start searching:

Searching

Above, we ask what we can do with Pinecone and let gpt-4 do the answering. The similarity search will search for 5 similar items and return them. We show the original URLs these results come from. In the Chunks section, you can see the original chunks because they are also in Pinecone as metadata. After the answer, you can find the full JSON returned by the ChatCompletion API and the full prompt we sent to that API.

Conclusion

In this blog post, we showed you how to create a Streamlit UI for the search_vectors.py script we talked about in a previous post. Streamlit allows you to easily build interactive web applications for your machine learning and data science projects. We also created a UI to upload posts to Pinecone. The full program allows you to add as much data as you want and query that data with semantic search, summarized and synthesized by the GPT model of choice. Give it a try and let me know what you think.

Enhancing Blog Post Search with Chunk-based Embeddings and Pinecone

In this blog post, we’ll show you a different approach to searching through a large database of blog posts. The previous approach involved creating a single embedding for the entire article and storing it in a vector database. The new approach is much more effective, and in this post, we’ll explain why and how to implement it.

The new approach involves the following steps:

  1. Chunk the article into pieces of about 400 tokens using LangChain
  2. Create an embedding for each chunk
  3. Store each embedding, along with its metadata such as the URL and the original text, in Pinecone
  4. Store the original text in Pinecone, but not indexed
  5. To search the blog posts, find the 5 best matching chunks and add them to the ChatCompletion prompt

We’ll explain each step in more detail below, but first, let’s start with a brief overview of the previous approach.

The previous approach used OpenAI’s embeddings API to vectorize the blog post articles and Pinecone, a vector database, to store and query the vectors. The article was vectorized as a whole, and the resulting vector was stored in Pinecone. To search the blog posts, cosine similarity was used to find the closest matching article, and the contents of the article were retrieved using the Python requests library and the BeautifulSoup library. Finally, a prompt was created for the ChatCompletion API, including the retrieved article.

The problem with this approach was that the entire article was vectorized as one piece. This meant that if the article was long, the vector might not represent the article accurately, as it would be too general. Moreover, if the article was too long, the ChatCompletion API call might fail because too many tokens were used.

The new approach solves these problems by chunking the article into smaller pieces, creating an embedding for each chunk, and storing each embedding in Pinecone. This way, we have a much more accurate representation of the article, as each chunk represents a smaller, more specific part of the article. And because each chunk is smaller, there is less risk of using too many tokens in the ChatCompletion API call.

To implement the new approach, we’ll use LangChain to chunk the article into pieces of about 400 tokens. LangChain is a library aimed at assisting in the development of applications that use LLMs, or large language models.

Next, we’ll create an embedding for each chunk using OpenAI’s embeddings API. As before, we will use the text-embedding-ada-002 model. And once we have the embeddings, we’ll store each one, along with its metadata, in Pinecone. The key for each embedding will be a hash of the URL, combined with the chunk number.

The original text will also be stored in Pinecone, but not indexed, so that it can be retrieved later. With this approach, we do not need to retrieve a blog article from the web. Instead, we just get the text from Pinecone directly.

To search the blog posts, we’ll use cosine similarity to find the 5 best-matching chunks. The 5 best matching chunks will be added to the ChatCompletion prompt, allowing us to ask questions based on the article’s contents.

Uploading the embeddings

The code to upload the embeddings is shown below. You will need to set the following environment variables:

export OPENAI_API_KEY=your_openai_api_key
export PINECONE_API_KEY=your_pinecone_api_key
export PINECONE_ENVIRONMENT=your_pinecone_environment
import feedparser
import os
import pinecone
import openai
import requests
from bs4 import BeautifulSoup
from retrying import retry
from langchain.text_splitter import RecursiveCharacterTextSplitter
import tiktoken
import hashlib

# use cl100k_base tokenizer for gpt-3.5-turbo and gpt-4
tokenizer = tiktoken.get_encoding('cl100k_base')

# create the length function used by the RecursiveCharacterTextSplitter
def tiktoken_len(text):
    tokens = tokenizer.encode(
        text,
        disallowed_special=()
    )
    return len(tokens)

@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000)
def create_embedding(article):
    # vectorize with OpenAI text-emebdding-ada-002
    embedding = openai.Embedding.create(
        input=article,
        model="text-embedding-ada-002"
    )

    return embedding["data"][0]["embedding"]

# OpenAI API key
openai.api_key = os.getenv('OPENAI_API_KEY')

# get the Pinecone API key and environment
pinecone_api = os.getenv('PINECONE_API_KEY')
pinecone_env = os.getenv('PINECONE_ENVIRONMENT')

pinecone.init(api_key=pinecone_api, environment=pinecone_env)

if "blog-index" not in pinecone.list_indexes():
    print("Index does not exist. Creating...")
    pinecone.create_index("blog-index", 1536, metadata_config= {"indexed": ["url", "chunk-id"]})
else:
    print("Index already exists. Deleting...")
    pinecone.delete_index("blog-index")
    print("Creating new index...")
    pinecone.create_index("blog-index", 1536, metadata_config= {"indexed": ["url", "chunk-id"]})

# set index; must exist
index = pinecone.Index('blog-index')

# URL of the RSS feed to parse
url = 'https://blog.baeke.info/feed/'

# Parse the RSS feed with feedparser
print("Parsing RSS feed: ", url)
feed = feedparser.parse(url)

# get number of entries in feed
entries = len(feed.entries)
print("Number of entries: ", entries)

# create recursive text splitter
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=400,
    chunk_overlap=20,  # number of tokens overlap between chunks
    length_function=tiktoken_len,
    separators=['\n\n', '\n', ' ', '']
)

pinecone_vectors = []
for i, entry in enumerate(feed.entries[:50]):
    # report progress
    print("Create embeddings for entry ", i, " of ", entries, " (", entry.link, ")")

    r = requests.get(entry.link)
    soup = BeautifulSoup(r.text, 'html.parser')
    article = soup.find('div', {'class': 'entry-content'}).text

    # create chunks
    chunks = text_splitter.split_text(article)

    # create md5 hash of entry.link
    url = entry.link
    url_hash = hashlib.md5(url.encode("utf-8"))
    url_hash = url_hash.hexdigest()
        
    # create embeddings for each chunk
    for j, chunk in enumerate(chunks):
        print("\tCreating embedding for chunk ", j, " of ", len(chunks))
        vector = create_embedding(chunk)

        # concatenate hash and j
        hash_j = url_hash + str(j)

        # add vector to pinecone_vectors list
        print("\tAdding vector to pinecone_vectors list for chunk ", j, " of ", len(chunks))
        pinecone_vectors.append((hash_j, vector, {"url": entry.link, "chunk-id": j, "text": chunk}))

        # upsert every 100 vectors
        if len(pinecone_vectors) % 100 == 0:
            print("Upserting batch of 100 vectors...")
            upsert_response = index.upsert(vectors=pinecone_vectors)
            pinecone_vectors = []

# if there are any vectors left, upsert them
if len(pinecone_vectors) > 0:
    print("Upserting remaining vectors...")
    upsert_response = index.upsert(vectors=pinecone_vectors)
    pinecone_vectors = []

print("Vector upload complete.")

Searching for blog posts

The code below is used to search blog posts:

import os
import pinecone
import openai
import tiktoken

# use cl100k_base tokenizer for gpt-3.5-turbo and gpt-4
tokenizer = tiktoken.get_encoding('cl100k_base')


def tiktoken_len(text):
    tokens = tokenizer.encode(
        text,
        disallowed_special=()
    )
    return len(tokens)

# get the Pinecone API key and environment
pinecone_api = os.getenv('PINECONE_API_KEY')
pinecone_env = os.getenv('PINECONE_ENVIRONMENT')

pinecone.init(api_key=pinecone_api, environment=pinecone_env)

# set index
index = pinecone.Index('blog-index')

while True:
    # set query
    your_query = input("\nWhat would you like to know? ")
    
    # vectorize your query with openai
    try:
        query_vector = openai.Embedding.create(
            input=your_query,
            model="text-embedding-ada-002"
        )["data"][0]["embedding"]
    except Exception as e:
        print("Error calling OpenAI Embedding API: ", e)
        continue

    # search for the most similar vector in Pinecone
    search_response = index.query(
        top_k=5,
        vector=query_vector,
        include_metadata=True)

    # create a list of urls from search_response['matches']['metadata']['url']
    urls = [item["metadata"]['url'] for item in search_response['matches']]

    # make urls unique
    urls = list(set(urls))

    # create a list of texts from search_response['matches']['metadata']['text']
    chunks = [item["metadata"]['text'] for item in search_response['matches']]

    # combine texts into one string to insert in prompt
    all_chunks = "\n".join(chunks)

    # print urls of the chunks
    print("URLs:\n\n", urls)

    # print the text number and first 50 characters of each text
    print("\nChunks:\n")
    for i, t in enumerate(chunks):
        print(f"\nChunk {i}: {t[:50]}...")

    try:
        # openai chatgpt with article as context
        # chat api is cheaper than gpt: 0.002 / 1000 tokens
        response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=[
                { "role": "system", "content":  "You are a thruthful assistant!" },
                { "role": "user", "content": f"""Answer the following query based on the context below ---: {your_query}
                                                    Do not answer beyond this context!
                                                    ---
                                                    {all_chunks}""" }
            ],
            temperature=0,
            max_tokens=750
        )

        print(f"\n{response.choices[0]['message']['content']}")
    except Exception as e:
        print(f"Error with OpenAI Completion: {e}")

In Action

Below, we ask if Redis supports storing vectors and what version of Redis we need in Azure. The Pinecone vector search found 5 chunks, all from the same blog post (there is only one URL). The five chunks are combined and sent to ChatGPT, together with the original question. The response from the ChatCompletion API is clear!

Example question and response

Conclusion

In conclusion, the “chunked” approach to searching through a database of blog posts is much more effective and solves many of the problems associated with the previous approach. We hope you found this post helpful, and we encourage you to try out the new approach in your own projects!

Pinecone and OpenAI magic: A guide to finding your long lost blog posts with vectorized search and ChatGPT

Searching through a large database of blog posts can be a daunting task, especially if there are thousands of articles. However, using vectorized search and cosine similarity, you can quickly query your blog posts and retrieve the most relevant content.

In this blog post, we’ll show you how to query a list of blog posts (from this blog) using a combination of vectorized search with cosine similarity and OpenAI ChatCompletions. We’ll be using OpenAI’s embeddings API to vectorize the blog post articles and Pinecone, a vector database, to store and query the vectors. We’ll also show you how to retrieve the contents of the article, create a prompt using the ChatCompletion API, and return the result to a web page.

ℹ️ Sample code is on GitHub: https://github.com/gbaeke/gpt-vectors

ℹ️ If you want an introduction to embeddings and cosine similarity, watch the video on YouTube by Part Time Larry.

Setting Up Pinecone

Before we can start querying our blog posts, we need to set up Pinecone. Pinecone is a vector database that makes it easy to store and query high-dimensional data. It’s perfect for our use case since we’ll be working with high-dimensional vectors.

ℹ️ Using a vector database is not strictly required. The GitHub repo contains app.py, which uses scikit-learn to create the vectors and perform a cosine similarity search. Many other approaches are possible. Pinecone just makes storing and querying the vectors super easy.

ℹ️ If you want more information about Pinecone and the concept of a vector database, watch this introduction video.

First, we’ll need to create an account with Pinecone and get the API key and environment name. In the Pinecone UI, you will find these as shown below. There will be a Show Key and Copy Key button in the Actions section next to the key.

Key and environment for Pinecone

Once we have an API key and the environment, we can use the Pinecone Python library to create and use indexes. Install the Pinecone library with pip install pinecone-client.

Although you can create a Pinecone index from code, we will create the index in the Pinecone portal. Go to Indexes and select Create Index. Create the index using cosine as metric and 1536 dimensions:

blog-index in Pinecone

The embedding model we will use to create the vectors, text-embedding-ada-002, outputs vectors with 1536 dimensions. For more info see OpenAI’s blog post of December 15, 2022.

To use the Pinecode index from code, look at the snippet below:

import pinecone

pinecone_api = "<your_api_key>"
pinecone_env = "<your_environment>"

pinecone.init(api_key=pinecone_api, environment=pinecone_env)

index = pinecone.Index('blog-index')

We create an instance of the Index class with the name “blog-index” and store this in index. This index will be used to store our blog post vectors or to perform searches on.

Vectorizing Blog Posts with OpenAI’s Embeddings API

Next, we’ll need to vectorize our blog post articles. We’ll be using OpenAI’s embeddings API to do this. The embeddings API takes a piece of text and returns a high-dimensional vector representation of that text. Here’s an example of how to do that for one article or string:

import openai

openai.api_key = "<your_api_key>"

article = "Some text from a blog post"

vector = openai.Embedding.create(
    input=article,
    model="text-embedding-ada-002"
)["data"][0]["embedding"]

We create a vector representation of our blog post article by calling the Embedding class’s create method. We pass in the article text as input and the text-embedding-ada-002 model, which is a pre-trained language model that can generate high-quality embeddings.

Storing Vectors in Pinecone

Once we have the vector representations of our blog post articles, we can store them in Pinecone. Instead of storing vector per vector, we can use upsert to store a list of vectors. The code below uses the feed of this blog to grab the URLs for 50 posts, every post is vectorized and the vector is added to a Python list of tuples, as expected by the upsert method. The list is then added to Pinecone at once. The tuple that Pinecone expects is:

(id, vector, metadata dictionary)

e.g. (0, vector for post 1, {"url": url to post 1}

Here is the code that uploads the first 50 posts of baeke.info to Pinecone. You need to set the Pinecone key and environment and the OpenAI key as environment variables. The code uses feedparser to grab the blog feed, and BeatifulSoup to parse the retrieved HTML. The code serves as an example only. It is not very robust when it comes to error checking etc…

import feedparser
import os
import pinecone
import numpy as np
import openai
import requests
from bs4 import BeautifulSoup

# OpenAI API key
openai.api_key = os.getenv('OPENAI_API_KEY')

# get the Pinecone API key and environment
pinecone_api = os.getenv('PINECONE_API_KEY')
pinecone_env = os.getenv('PINECONE_ENVIRONMENT')

pinecone.init(api_key=pinecone_api, environment=pinecone_env)

# set index; must exist
index = pinecone.Index('blog-index')

# URL of the RSS feed to parse
url = 'https://blog.baeke.info/feed/'

# Parse the RSS feed with feedparser
feed = feedparser.parse(url)

# get number of entries in feed
entries = len(feed.entries)
print("Number of entries: ", entries)

post_texts = []
pinecone_vectors = []
for i, entry in enumerate(feed.entries[:50]):
    # report progress
    print("Processing entry ", i, " of ", entries)

    r = requests.get(entry.link)
    soup = BeautifulSoup(r.text, 'html.parser')
    article = soup.find('div', {'class': 'entry-content'}).text

    # vectorize with OpenAI text-emebdding-ada-002
    embedding = openai.Embedding.create(
        input=article,
        model="text-embedding-ada-002"
    )

    # print the embedding (length = 1536)
    vector = embedding["data"][0]["embedding"]

    # append tuple to pinecone_vectors list
    pinecone_vectors.append((str(i), vector, {"url": entry.link}))

# all vectors can be upserted to pinecode in one go
upsert_response = index.upsert(vectors=pinecone_vectors)

print("Vector upload complete.")

Querying Vectors with Pinecone

Now that we have stored our blog post vectors in Pinecone, we can start querying them. We’ll use cosine similarity to find the closest matching blog post. Here is some code that does just that:

query_vector = <vector representation of query>  # vector created with OpenAI as well

search_response = index.query(
    top_k=5,
    vector=query_vector,
    include_metadata=True
)

url = get_highest_score_url(search_response['matches'])

def get_highest_score_url(items):
    highest_score_item = max(items, key=lambda item: item["score"])

    if highest_score_item["score"] > 0.8:
        return highest_score_item["metadata"]['url']
    else:
        return ""

We create a vector representation of our query (you don’t see that here but it’s the same code used to vectorize the blog posts) and pass it to the query method of the Pinecone Index class. We set top_k=5 to retrieve the top 5 matching blog posts. We also set include_metadata=True to include the metadata associated with each vector in our response. That way, we also have the URL of the top 5 matching posts.

The query method returns a dictionary that contains a matches key. The matches value is a list of dictionaries, with each dictionary representing a matching blog post. The score key in each dictionary represents the cosine similarity score between the query vector and the blog post vector. We use the get_highest_score_url function to find the blog post with the highest cosine similarity score.

The function contains some code to only return the highest scoring URL if the score is > 0.8. It’s of course up to you to accept lower matching results. There is a potential for the vector query to deliver an article that’s not highly relevant which results in an irrelevant context for the OpenAI ChatCompletion API call we will do later.

Retrieving the Contents of the Blog Post

Once we have the URL of the closest matching blog post, we can retrieve the contents of the article using the Python requests library and the BeautifulSoup library.

import requests
from bs4 import BeautifulSoup

r = requests.get(url)
soup = BeautifulSoup(r.text, 'html.parser')

article = soup.find('div', {'class': 'entry-content'}).text

We send a GET request to the URL of the closest matching blog post and retrieve the HTML content. We use the BeautifulSoup library to parse the HTML and extract the contents of the <div> element with the class “entry-content”.

Creating a Prompt for the ChatCompletion API

Now that we have the contents of the blog post, we can create a prompt for the ChatCompletion API. The crucial part here is that our OpenAI query should include the blog post we just retrieved!

response = openai.ChatCompletion.create(
    model="gpt-3.5-turbo",
    messages=[
        { "role": "system", "content": "You are a polite assistant" },
        { "role": "user", "content": "Based on the article below, answer the following question: " + your_query +
            "\nAnswer as follows:" +
            "\nHere is the answer directly from the article:" +
            "\nHere is the answer from other sources:" +
             "\n---\n" + article }
           
    ],
    temperature=0,
    max_tokens=200
)

response_text=f"\n{response.choices[0]['message']['content']}"

We use the ChatCompletion API with the gpt-3.5-turbo model to ask our question. This is the same as using ChatGPT on the web with that model. At this point in time, the GPT-4 model was not available yet.

Instead of one prompt, we send a number of dictionaries in a messages list. The first item in the list sets the system message. The second item is the actual user question. We ask to answer the question based on the blog post we stored in the article variable and we provide some instructions on how to answer. We add the contents of the article to our query.

If the article is long, you run the risk of using too many tokens. If that happens, the ChatCompletion call will fail. You can use the tiktoken library to count the tokens and prevent the call to happen in the first place. Or you can catch the exception and tell the user. In the above code, there is no error handling. We only include the core code that’s required.

Returning the Result to a Web Page

If you are running the search code in an HTTP handler as the result of the user typing a query in a web page, you can return the result to the caller:

return jsonify({
    'url': url,
    'response': response_text
})

The full example, including an HTML page and Flask code can be found on GitHub.

The result could look like this:

Query results in the closest URL using vectorized search and ChatGPT answering the question based on the contents the URL points at

Conclusion

Using vectorized search and cosine similarity, we can quickly query a database of blog posts and retrieve the most relevant post. By combining OpenAI’s embeddings API, Pinecone, and the ChatCompletion API, we can create a powerful tool for searching and retrieving blog post content using natural language.

Note that there are some potential issues as well. The code we show is merely a starting point:

  • Limitations of cosine similarity: it does not take into account all properties of the vectors, which can lead to misleading results
  • Prompt engineering: the prompt we use works but there might be prompts that just work better. Experimentation with different prompts is crucial!
  • Embeddings: OpenAI embeddings are trained on a large corpus of text, which may not be representative of the domain-specific language in the posts
  • Performance might not be sufficient if the size of the database grows large. For my blog, that’s not really an issue. 😀

Step-by-Step Guide: How to Build Your Own Chatbot with the ChatGPT API

In this blog post, we will be discussing how to build your own chat bot using the ChatGPT API. It’s worth mentioning that we will be using the OpenAI APIs directly and not the Azure OpenAI APIs, and the code will be written in Python. A crucial aspect of creating a chat bot is maintaining context in the conversation, which we will achieve by storing and sending previous messages to the API at each request. If you are just starting with AI and chat bots, this post will guide you through the step-by-step process of building your own simple chat bot using the ChatGPT API.

Python setup

Ensure Python is installed. I am using version 3.10.8. For editing code, I am using Visual Studio code as the editor. For the text-based chat bot, you will need the following Python packages:

  • openai: make sure the version is 0.27.0 or higher; earlier versions do not support the ChatCompletion APIs
  • tiktoken: a library to count the number of tokens of your chat bot messages

Install the above packages with your package manager. For example: pip install openai.

All code can be found on GitHub.

Getting an account at OpenAI

We will write a text-based chat bot that asks for user input indefinitely. The first thing you need to do is sign up for API access at https://platform.openai.com/signup. Access is not free but for personal use, while writing and testing the chat bot, the price will be very low. Here is a screenshot from my account:

Oh no, $0.13 dollars

When you have your account, generate an API key from https://platform.openai.com/account/api-keys. Click the Create new secret key button and store the key somewhere.

Writing the bot

Now create a new Python file called app.py and add the following lines:

import os
import openai
import tiktoken

openai.api_key = os.getenv("OPENAI_KEY")

We already discussed the openai and tiktoken libraries. We will also use the builtin os library to read environment variables.

In the last line, we read the environment variable OPENAI_KEY. If you use Linux, in your shell, use the following command to store the OpenAI key in an environment variable: export OPENAI_KEY=your-OpenAI-key. We use this approach to avoid storing the API key in your code and accidentally uploading it to GitHub.

To implement the core chat functionality, we will use a Python class. I was following a Udemy course about ChatGPT and it used a similar approach, which I liked. By the way, I can highly recommend that course. Check it out here.

Let’s start with the class constructor:

class ChatBot:

    def __init__(self, message):
        self.messages = [
            { "role": "system", "content": message }
        ]

In the constructor, we define a messages list and set the first item in that list to a configurable dictionary: { "role": "system", "content": message }. In the ChatGPT API calls, the messages list provides context to the API because it contains all the previous messages. With this initial system message, we can instruct the API to behave in a certain way. For example, later in the code, you will find this code to create an instance of the ChatBot class:

bot = ChatBot("You are an assistant that always answers correctly. If not sure, say 'I don't know'.")

But you could also do:

bot = ChatBot("You are an assistant that always answers wrongly.Always contradict the user")

In practice, ChatGPT does not follow the system instruction to strongly. User messages are more important. So it could be that, after some back and forth, the answers will not follow the system instruction anymore.

Let’s continue with another method in the class, the chat method:

def chat(self):
        prompt = input("You: ")
        
        self.messages.append(
            { "role": "user", "content": prompt}
        )
        
        response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages = self.messages,
            temperature = 0.8
        )
        
        answer = response.choices[0]['message']['content']
        
        print(answer)
        
        self.messages.append(
           { "role": "assistant", "content": answer} 
        )

        tokens = self.num_tokens_from_messages(self.messages)
        print(f"Total tokens: {tokens}")

        if tokens > 4000:
            print("WARNING: Number of tokens exceeds 4000. Truncating messages.")
            self.messages = self.messages[2:]

The chat method is where the action happens. It does the following:

  • It prompts the user to enter some input.
  • The user’s input is stored in a dictionary as a message with a “user” role and appended to a list of messages called self.messages. If this is the first input, we now have two messages in the list, a system message and a user message.
  • It then creates a response using OpenAI’s gpt-3.5-turbo model, passing in the self.messages list and a temperature of 0.8 as parameters. We use the ChatCompletion API versus the Completion API that you use with other models such as text-davinci-003.
  • The generated response is stored in a variable named answer. The full response contains a lot of information. We are only interested in the first response (there is only one) and grab the content.
  • The answer is printed to the console.
  • The answer is also added to the self.messages list as a message with an “assistant” role. If this is the first input, we now have three messages in the list: a system message, the first user message (the input) and the assistant’s response.
  • The total number of tokens in the self.messages list is computed using a separate function called num_tokens_from_messages() and printed to the console.
  • If the number of tokens exceeds 4000, a warning message is printed and the self.messages list is truncated to remove the first two messages. We will talk about these tokens later.

It’s important to realize we are using the Chat completions here. You can find more information about Chat completions here.

If you did not quite get how the text response gets extracted, here is an example of a full response from the Chat completion API:

{
 'id': 'chatcmpl-6p9XYPYSTTRi0xEviKjjilqrWU2Ve',
 'object': 'chat.completion',
 'created': 1677649420,
 'model': 'gpt-3.5-turbo',
 'usage': {'prompt_tokens': 56, 'completion_tokens': 31, 'total_tokens': 87},
 'choices': [
   {
    'message': {
      'role': 'assistant',
      'content': 'The 2020 World Series was played in Arlington, Texas at the Globe Life Field, which was the new home stadium for the Texas Rangers.'},
    'finish_reason': 'stop',
    'index': 0
   }
  ]
}

The response is indeed in choices[0][‘message’][‘content’].

To make this rudimentary chat bot work, we will repeatedly call the chat method like so:

bot = ChatBot("You are an assistant that always answers correctly. If not sure, say 'I don't know'.")
    while True:
        bot.chat()

Every time you input a question, the API answers and both the question and answer is added to the messages list. Of course, that makes the messages list grow larger and larger, up to a point where it gets to large. The question is: “What is too large?”. Let’s answer that in the next section.

Counting tokens

A language model does not work with text as humans do. Instead, they use tokens. It’s not important how this exactly works but it is important to know that you get billed based on these tokens. You pay per token.

In addition, the model we use here (gpt-3.5-turbo) has a maximum limit of 4096 tokens. This might change in the future. With our code, we cannot keep adding messages to the messages list because, eventually, we will pass the limit and the API call will fail.

To have an idea about the tokens in our messages list, we have this function:

def num_tokens_from_messages(self, messages, model="gpt-3.5-turbo"):
        try:
            encoding = tiktoken.encoding_for_model(model)
        except KeyError:
            encoding = tiktoken.get_encoding("cl100k_base")
        if model == "gpt-3.5-turbo":  # note: future models may deviate from this
            num_tokens = 0
            for message in messages:
                num_tokens += 4  # every message follows <im_start>{role/name}\n{content}<im_end>\n
                for key, value in message.items():
                    num_tokens += len(encoding.encode(value))
                    if key == "name":  # if there's a name, the role is omitted
                        num_tokens += -1  # role is always required and always 1 token
            num_tokens += 2  # every reply is primed with <im_start>assistant
            return num_tokens
        else:
            raise NotImplementedError(f"""num_tokens_from_messages() is not presently implemented for model {model}.""")

The above function comes from the OpenAI cookbook on GitHub. In my code, the function is used to count tokens in the messages list and, if the number of tokens is above a certain limit, we remove the first two messages from the list. The code also prints the tokens so you now how many you will be sending to the API.

The function contains references to <im_start> and <im_end>. This is ChatML and is discussed here. Because you use the ChatCompletion API, you do not have to worry about this. You just use the messages list and the API will transform it all to ChatML. But when you count tokens, ChatML needs to be taken into account for the total token count.

Note that Microsoft examples for Azure OpenAI, do use ChatML in the prompt, in combination with the default Completion APIs. See Microsoft Learn for more information. You will quickly see that using the ChatCompletion API with the messages list is much simpler.

To see, and download, the full code, see GitHub.

Running the code

To run the code, just run app.py. On my system, I need to use python3 app.py. I set the system message to You are an assistant that always answers wrongly. Contradict the user. 😀

Here’s an example conversation:

Although, at the start, the responses follow the system message, the assistant starts to correct itself and answers correctly. As stated, user messages eventually carry more weight.

Summary

In this post, we discussed how to build a chat bot using the ChatGPT API and Python. We went through the setup process, created an OpenAI account, and wrote the chat bot code using the OpenAI API. The bot used the ChatCompletion API and maintained context in the conversation by storing and sending previous messages to the API at each request. We also discussed counting tokens and truncating the message list to avoid exceeding the maximum token limit for the model. The full code is available on GitHub, and we provided an example conversation between the bot and the user. The post aimed to guide both beginning developers and beginners in AI and chat bot development through the step-by-step process of building their chat bot using the ChatGPT API and keep it as simple as possible.

Hope you liked it!

Authenticate to Azure Resources with Azure Managed Identities

In this post, we will take a look at managed identities in general and system-assigned managed identity in particular. Managed identities can be used by your code to authenticate to Azure AD resources from Azure compute resources that support it, like virtual machines and containers.

But first, let’s look at the other option and why you should avoid it if you can: service principals.

Service Principals

If you have code that needs to authenticate to Azure AD-protected resources such as Azure Key Vault, you can always create a service principal. It’s the option that always works. It has some caveats that will be explained further in this post.

The easiest way to create a service principal is with the single Azure CLI command below:

az ad sp create-for-rbac

The command results in the following output:

{
  "appId": "APP_ID",
  "displayName": "azure-cli-2023-01-06-11-18-45",
  "password": "PASSWORD",
  "tenant": "TENANT_ID"
}

If the service principal needs access to, let’s say, Azure Key Vault, you could use the following command to grant that access:

APP_ID="appId from output above"
$SUBSCRIPTION_ID="your subscription id"
$RESOURCE_GROUP="your resource group"
$KEYVAULT_NAME="short name of your key vault"

az role assignment create --assignee $APP_ID \
  --role "Key Vault Secrets User" \
  --scope "/subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.KeyVault/vaults/$KEYVAULT_NAME"

The next step is to configure your application to use the service principal and its secret to obtain an Azure AD token (or credential) that can be passed to Azure Key Vault to retrieve secrets or keys. That means you need to find a secure way to store the service principal secret with your application, which is something you want to avoid.

In a Python app, you can use the ClientSecretCredential class and pass your Azure tenant id, the service principal appId (or client Id) and the secret. You can then use the secret with a SecretClient like in the snippet below.

# Create a credential object
credential = ClientSecretCredential(tenant_id, client_id, client_secret)

# Create a SecretClient using the credential
client = SecretClient(vault_url=VAULT_URL, credential=credential)

Other languages and frameworks have similar libraries to reach the same result. For instance JavaScript and C#.

This is quite easy to do but again, where do you store the service principal’s secret securely?

The command az ad sp create-for-rbac also creates an App Registration (and Enterprise application) in Azure AD:

Azure AD App Registration

The secret (or password) for our service principal is partly displayed above. As you can see, it expires a year from now (blog post written on January 6th, 2023). You will need to update the secret and your application when that time comes, preferably before that. We all know what expiring secrets and certificates give us: an app that’s not working because we forgot to update the secret or certificate!

💡 Note that one year is the default. You can set the number of years with the --years parameter in az ad sp create-for-rbac.

💡 There will always be cases where managed identities are not supported such as connecting 3rd party systems to Azure. However, it should be clear that whenever managed identity is supported, use it to provide your app with the credentials it needs.

In what follows, we will explain managed identities in general, and system-assigned managed identity in particular. Another blog post will discuss user-assigned managed identity.

Managed Identities Explained

Azure Managed Identities allow you to authenticate to Azure resources without the need to store credentials or secrets in your code or configuration files.

There are two types of Managed Identities:

  • system-assigned
  • user-assigned

System-assigned Managed Identities are tied to a specific Azure resource, such as a virtual machine or Azure Container App. When you enable a system-assigned identity for a resource, Azure creates a corresponding identity in the Azure Active Directory (AD) for that resource, similar to what you have seen above. This identity can be used to authenticate to any service that supports Azure AD authentication. The lifecycle of a system-assigned identity is tied to the lifecycle of the Azure resource. When the resource is deleted, the corresponding identity is also deleted. Via a special token endpoint, your code can request an access token for the resource it wants to access.

User-assigned Managed Identities, on the other hand, are standalone identities that can be associated with one or more Azure resources. This allows you to use the same identity across multiple resources and manage the identity’s lifecycle independently from the resources it is associated with. In your code, you can request an access token via the same special token endpoint. You will have to specify the appId (client Id) of the user-managed identity when you request the token because multiple identities could be assigned to your Azure resource.

In summary, system-assigned Managed Identities are tied to a specific resource and are deleted when the resource is deleted, while user-assigned Managed Identities are standalone identities that can be associated with multiple resources and have a separate lifecycle.

System-assigned managed identity

Virtual machines support system and user-assigned managed identity and make it easy to demonstrate some of the internals.

Let’s create a Linux virtual machine and enable a system-assigned managed identity. You will need an Azure subscription and be logged on with the Azure CLI. I use a Linux virtual machine here to demonstrate how it works with bash. Remember that this also works on Windows VMs and many other Azure resources such as App Services, Container Apps, and more.

Run the code below. Adapt the variables for your environment.

RG="rg-mi"
LOCATION="westeurope"
PASSWORD="oE2@pl9hwmtM"

az group create --name $RG --location $LOCATION

az vm create \
  --name vm-demo \
  --resource-group $RG \
  --image UbuntuLTS \
  --size Standard_B1s \
  --admin-username azureuser \
  --admin-password $PASSWORD \
  --assign-identity


After the creation of the resource group and virtual machine, the portal shows the system assigned managed identity in the virtual machine’s Identity section:

System assigned managed identity

We can now run some code on the virtual machine to obtain an Azure AD token for this identity that allows access to a Key Vault. Key Vault is just an example here.

We will first need to create a Key Vault and a secret. After that we will grant the managed identity access to this Key Vault. Run these commands on your own machine, not the virtual machine you just created:

# generate a somewhat random name for the key vault
KVNAME=kvdemo$RANDOM

# create with vault access policy which grants creator full access
az keyvault create --name $KVNAME --resource-group $RG

# with full access, current user can create a secret
az keyvault secret set --vault-name $KVNAME --name mysecret --value "TOPSECRET"

# show the secret; should reveal TOPSECRET
az keyvault secret show --vault-name $KVNAME --name mysecret

# switch the Key Vault to AAD authentication
az keyvault update --name $KVNAME --enable-rbac-authorization

Now we can grant the system assigned managed identity access to Key Vault via Azure RBAC. Let’s look at the identity with the command below:

az vm identity show --resource-group $RG --name vm-demo

This returns the information below. Note that principalId was also visible in the portal as Object (principal) ID. Yes, not confusing at all… 🤷‍♂️

{
  "principalId": "YOUR_PRINCIPAL_ID",
  "tenantId": "YOUR_TENANT_ID",
  "type": "SystemAssigned",
  "userAssignedIdentities": null
}

Now assign the Key Vault Secrets User role to this identity:

PRI_ID="principal ID above"
SUB_ID="Azure subscription ID"

# below, scope is the Azure Id of the Key Vault 

az role assignment create --assignee $PRI_ID \
  --role "Key Vault Secrets User" \
  --scope "/subscriptions/$SUB_ID/resourceGroups/$RG/providers/Micr
osoft.KeyVault/vaults/$KVNAME"

If you check the Key Vault in the portal, in IAM, you should see:

System assigned identity of VM has Secrets User role

Now we can run some code on the VM to obtain an Azure AD token to read the secret from Key Vault. SSH into the virtual machine using its public IP address with ssh azureuser@IPADDRESS. Next, use the commands below:

# install jq on the vm for better formatting; you will be asked for your password
sudo snap install jq

curl 'http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fvault.azure.net' -H Metadata:true | jq

It might look weird but by sending the curl request to that special IP address on the VM, you actually request an access token to access Key Vault resources (in this case, it could also be another type of resource). There’s more to know about this special IP address and the other services it provides. Check Microsoft Learn for more information.

The result of the curl command is JSON below (nicely formatted with jq):

{
  "access_token": "ACCESS_TOKEN",
  "client_id": "CLIENT_ID",
  "expires_in": "86038",
  "expires_on": "1673095093",
  "ext_expires_in": "86399",
  "not_before": "1673008393",
  "resource": "https://vault.azure.net",
  "token_type": "Bearer"
}

Note that you did not need any secret to obtain the token. Great!

Now run the following code but first replace <YOUR VAULT NAME> with the short name of your Key Vault:

# build full URL to your Key Vault
VAULTURL="https://<YOUR VAULT NAME>.vault.azure.net"

ACCESS_TOKEN=$(curl 'http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fvault.azure.net' -H Metadata:true | jq -r .access_token)

curl -s "$VAULTURL/secrets/mysecret?api-version=2016-10-01" -H "Authorization: Bearer $ACCESS_TOKEN" | jq -r .value

First, we set the vault URL to the full URL including https://. Next, we retrieve the full JSON token response but use jq to only grab the access token. The -r option strips the " from the response. Next, we use the Azure Key Vault REST API to read the secret with the access token for authorization. The result should be TOPSECRET! 😀

Instead of this raw curl code, which is great for understanding how it works under the hood, you can use Microsoft’s identity libraries for many popular languages. For example in Python:

from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient

# Authenticate using a system-assigned managed identity
credential = DefaultAzureCredential()

# Create a SecretClient using the credential and the key vault URL
secret_client = SecretClient(vault_url="https://YOURKVNAME.vault.azure.net", credential=credential)

# Retrieve the secret
secret = secret_client.get_secret("mysecret")

# Print the value of the secret
print(secret.value)

If you are somewhat used to Python, you know you will need to install azure-identity and azure-keyvault-secrets with pip. The DefaultAzureCredential class used in the code automatically works with system managed identity in virtual machines but also other compute such as Azure Container Apps. The capabilities of this class are well explained in the docs: https://learn.microsoft.com/en-us/python/api/overview/azure/identity-readme?view=azure-python. The identity libraries for other languages work similarly.

What about Azure Arc-enabled servers?

Azure Arc-enabled servers also have a managed identity. It is used to update the properties of the Azure Arc resource in the portal. You can grant this identity access to other Azure resources such as Key Vault and then grab the token in a similar way. Similar but not quite identical. The code with curl looks like this (from the docs):

ChallengeTokenPath=$(curl -s -D - -H Metadata:true "http://127.0.0.1:40342/metadata/identity/oauth2/token?api-version=2019-11-01&resource=https%3A%2F%2Fvault.azure.net" | grep Www-Authenticate | cut -d "=" -f 2 | tr -d "[:cntrl:]")

ChallengeToken=$(cat $ChallengeTokenPath)

if [ $? -ne 0 ]; then
    echo "Could not retrieve challenge token, double check that this command is run with root privileges."
else
    curl -s -H Metadata:true -H "Authorization: Basic $ChallengeToken" "http://127.0.0.1:40342/metadata/identity/oauth2/token?api-version=2019-11-01&resource=https%3A%2F%2Fvault.azure.net"
fi

On an Azure Arc-enabled machine that runs on-premises or in other clouds, the special IP address 169.254.169.254 is not available. Instead, the token request is sent to http://localhost:40342. The call is designed to fail and respond with a Www-Authenticate header that contains the path to a file on the machine (created dynamically). Only specific users and groups on the machine are allowed to read the contents of that file. This step was added for extra security so that not every process can read the contents of this file.

The second command retrieves the contents of the file and uses it for basic authentication purposes in the second curl request. It’s the second curl request that will return the access token.

Note that this works for both Linux and Windows Azure Arc-enabled systems. It is further explained here: https://learn.microsoft.com/en-us/azure/azure-arc/servers/managed-identity-authentication.

In contrast with managed identity on Azure compute, I am not aware of support for Azure Arc in the Microsoft identity libraries. To obtain a token with Python, check the following gist with some sample code: https://gist.github.com/gbaeke/343b14305e468aa433fe90441da0cabd.

The great thing about this is that managed identity can work on servers not in Azure as long if you enable Azure Arc on them! 🎉

Conclusion

In this post, we looked at what managed identities are and zoomed in on system-assigned managed identity. Azure Managed Identities are a secure and convenient way to authenticate to Azure resources without having to store credentials in code or configuration files. Whenever you can, use managed identity instead of service principals. And as you have seen, it even works with compute that’s not in Azure, such as Azure Arc-enabled servers.

Stay tuned for the next post about user-assigned managed identity.

AKS Workload Identity Revisited

A while ago, I blogged about Workload Identity. Since then, Microsoft simplified the configuration steps and enabled Managed Identity, in addition to app registrations.

But first, let’s take a step back. Why do you need something like workload identity in the first place? Take a look at the diagram below.

Workloads (deployed in a container or not) often need to access Azure AD protected resources. In the diagram, the workload in the container wants to read secrets from Azure Key Vault. The recommended option is to use managed identity and grant that identity the required role in Azure Key Vault. Now your code just needs to obtain credentials for that managed identity.

In Kubernetes, that last part presents a challenge. There needs to be a mechanism to map such a managed identity to a pod and allow code in the container to obtain an Azure AD authentication token. The Azure AD Pod Identity project was a way to solve this but as of 24/10/2022, AAD Pod Identity is deprecated. It is now replaced by Workload Identity. It integrates with native Kubernetes capabilities to federate with external identity providers such as Azure AD. It has the following advantages:

  • Not an AKS feature, it’s a Kubernetes feature (other cloud, on-premises, edge); similar functionality exists for GKE for instance
  • Scales better than AAD Pod Identity
  • No need for custom resource definitions
  • No need to run pods that intercept IMDS (instance metadata service) traffic; instead, there are webhook pods that run when pods are created/updated

If the above does not make much sense, check https://learn.microsoft.com/en-us/azure/aks/use-azure-ad-pod-identity. But don’t use it OK? 😉

At a basic level, Workload Identity works as follows:

  • Your AKS cluster is configured to issue tokens. Via an OIDC (OpenID Connect) discovery document, published by AKS, Azure AD can validate the tokens it receives from the cluster.
  • A Kubernetes service account is created and properly annotated and labeled. Pods are configured to use the service account via the serviceAccount field.
  • The Azure Managed Identity is configured with Federated credentials. The federated credential contains a link to the OIDC discovery document (Cluster Issuer URL) and configures the namespace and service account used by the Kubernetes pod. That generates a subject identifier like system:serviceaccount:namespace_name:service_account_name.
  • Tokens can now be generated for the configured service account and swapped for an Azure AD token that can be picked up by your workload.
  • A Kubernetes mutating webhook is the glue that makes all of this work. It ensures the token is mapped to a file in your container and sets needed environment variables.

Creating a cluster with OIDC and Workload Identity

Create a basic cluster with one worker node and both features enabled. You need an Azure subscription and the Azure CLI. Ensure the prerequisites are met and that you are logged in with az login. Run the following in a Linux shell:

RG=your_resource_group
CLUSTER=your_cluster_name

az aks create -g $RG -n $CLUSTER --node-count 1 --enable-oidc-issuer \
  --enable-workload-identity --generate-ssh-keys

After deployment, find the OIDC Issuer URL with:

export AKS_OIDC_ISSUER="$(az aks show -n $CLUSTER -g $RG --query "oidcIssuerProfile.issuerUrl" -otsv)"

When you add /.well-known/openid-configuration to that URL, you will see something like:

OIDC discovery document

The field jwks_uri contains a link to key information, used by AAD to verify the tokens issued by Kubernetes.

In earlier versions of Workload Identity, you had to install a mutating admission webhook to project the Kubernetes token to a volume in your workload. In addition, the webhook also injected several environment variables:

  • AZURE_CLIENT_ID: client ID of an AAD application or user-assigned managed identity
  • AZURE_TENANT_ID: tenant ID of Azure subscription
  • AZURE_FEDERATED_TOKEN_FILE: the path to the federated token file; you can do cat $AZURE_FEDERATED_TOKEN_FILE to see the token. Note that this is the token issued by Kubernetes, not the exchanged AAD token (exchanging the token happens in your code). The token is a jwt. You can use https://jwt.io to examine it:
Decoded jwt issued by Kubernetes

But I am digressing… In the current implementation, you do not have to install the mutating webhook yourself. When you enable workload identity with the CLI, the webhook is installed automatically. In kube-system, you will find pods starting with azure-wi-webhook-controller-manager. The webhook kicks in whenever you create or update a pod. The end result is the same. You get the projected token + the environment variables.

Creating a service account

Ok, now we have a cluster with OIDC and workload identity enabled. We know how to retrieve the issuer URL and we learned we do not have to install anything else to make this work.

You will have to configure the pods you want a token for. Not every pod has containers that need to authenticate to Azure AD. To configure your pods, you first create a Kubernetes service account. This is a standard service account. To learn about service accounts, check my YouTube video.

apiVersion: v1
kind: ServiceAccount
metadata:
  annotations:
    azure.workload.identity/client-id: CLIENT ID OF MANAGED IDENTITY
  labels:
    azure.workload.identity/use: "true"
  name: sademo
  namespace: default

The label ensures that the mutating webhook will do its thing when a pod uses this service account. We also indicate the managed identity we want a token for by specifying its client ID in the annotation.

Note: you need to create the managed identity yourself and grab its client id. Use the following commands:

RG=your_resource_group
IDENTITY=your_chosen_identity_name
LOCATION=your_azure_location (e.g. westeurope)

export SUBSCRIPTION_ID="$(az account show --query "id" -otsv)"

az identity create --name $IDENTITY --resource-group $RG \
  --location $LOCATION --subscription $SUBSCRIPTION_ID

export USER_ASSIGNED_CLIENT_ID="$(az identity show -n $IDENTITY -g $RG --query "clientId" -otsv)"

echo $USER_ASSIGNED_CLIENT_ID

The last command prints the id to use in the service account azure.workload.identity/client-id annotation.

Creating a pod that uses the service account

Let’s create a deployment that deploys pods with an Azure CLI image:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: azcli-deployment
  namespace: default
  labels:
    app: azcli
spec:
  replicas: 1
  selector:
    matchLabels:
      app: azcli
  template:
    metadata:
      labels:
        app: azcli
    spec:
      # needs to refer to service account used with federation
      serviceAccount: sademo
      containers:
        - name: azcli
          image: mcr.microsoft.com/azure-cli:latest
          command:
            - "/bin/bash"
            - "-c"
            - "sleep infinity"

Above, the important line is serviceAccount: sademo. When the pod is created or modified, the mutating webhook will check the service account and its annotations. If it is configured for workload identity, the webhook will do its thing: projecting the Kubernetes token file and setting the environment variables:

The webhook did its work 😉

How to verify it works?

We can use the Azure CLI support for federated tokens as follows:

az login --federated-token "$(cat $AZURE_FEDERATED_TOKEN_FILE)" \
--service-principal -u $AZURE_CLIENT_ID -t $AZURE_TENANT_ID

After running the command, the error below appears:

Oh no…

Clearly, something is wrong and there is. We have forgotten to configure the managed identity for federation. In other words, when we present our Kubernetes token, Azure AD needs information to validate it and return an AAD token.

Use the following command to create a federated credential on the user-assigned managed identity you created earlier:

RG=your_resource_group
IDENTITY=your_chosen_identity_name
AKD_OIDC_ISSUER=your_oidc_issuer
SANAME=sademo

az identity federated-credential create --name fic-sademo \
  --identity-name $IDENTITY \
  --resource-group $RG --issuer ${AKS_OIDC_ISSUER} \
  --subject system:serviceaccount:default:$SANAME

After running the above command, the Azure Managed Identity has the following configuration:

Federated credentials on the Managed Identity

More than one credential is possible. Click on the name of the federated credential. You will see:

Details of the federated credential

Above, the OIDC Issuer URL is set to point to our cluster. We expect a token with a subject identifier (sub) of system:serviceaccount:default:sademo. You can check the decoded jwt earlier in this post to see that the sub field in the token issued by Kubernetes matches the one above. It needs to match or the process will fail.

Now we can run the command again:

az login --federated-token "$(cat $AZURE_FEDERATED_TOKEN_FILE)" \
--service-principal -u $AZURE_CLIENT_ID -t $AZURE_TENANT_ID

You will be logged in to the Azure CLI with the managed identity credentials:

But what about your own apps?

Above, we used the Azure CLI. The most recent versions (>= 2.30.0) support federated credentials and use MSAL. But what about your custom code?

The code below is written in Python and uses the Python Azure identity client library with DefaultAzureCredential. This code works with managed identity in Azure Container Apps or Azure App Service and was not modified. Here’s the code for reference:

import threading
import os
import logging
import time
import signal
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential

from azure.appconfiguration.provider import (
    AzureAppConfigurationProvider,
    SettingSelector,
    AzureAppConfigurationKeyVaultOptions
)

logging.basicConfig(encoding='utf-8', level=logging.WARNING)

def get_config(endpoint):
  selects = {SettingSelector(key_filter=f"myapp:*", label_filter="prd")}
  trimmed_key_prefixes = {f"myapp:"}
  key_vault_options = AzureAppConfigurationKeyVaultOptions(secret_resolver=retrieve_secret)
  app_config = {}
  try:
    app_config = AzureAppConfigurationProvider.load(
            endpoint=endpoint, credential=CREDENTIAL, selects=selects, key_vault_options=key_vault_options, 
            trimmed_key_prefixes=trimmed_key_prefixes)
  except Exception as ex:
    logging.error(f"error loading app config: {ex}")

  return app_config

def run():
    try:
      global CREDENTIAL 
      CREDENTIAL = DefaultAzureCredential(exclude_visual_studio_code_credential=True)
    except Exception as ex:
      logging.error(f"error setting credentials: {ex}")

    endpoint = os.getenv('AZURE_APPCONFIGURATION_ENDPOINT')

    if not endpoint:
        logging.error("Environment variable 'AZURE_APPCONFIGURATION_ENDPOINT' not set")

    app_config =  {}
    while True:
        if not app_config:
            logging.warning("trying to load app config")
            app_config = get_config(endpoint)
        else:
            config_value=app_config['appkey']
            logging.warning(f"doing useful work with {config_value}")
            # if key exists in app_config, do something with it
            if 'mysecret' in app_config:
                logging.warning(f"and hush hush, there's a secret: {app_config['mysecret']}")
        time.sleep(5)


class GracefulKiller:
  kill_now = False
  def __init__(self):
    signal.signal(signal.SIGINT, self.exit_gracefully)
    signal.signal(signal.SIGTERM, self.exit_gracefully)

  def exit_gracefully(self, *args):
    self.kill_now = True


def retrieve_secret(uri):
    try:
        # uri is in format: https://<keyvaultname>.vault.azure.net/secrets/<secretname>
        # retrieve key vault uri and secret name from uri
        vault_uri = "https://" + uri.split('/')[2]
        secret_name = uri.split('/')[-1]
        logging.warning(f"Retrieving secret {secret_name} from {vault_uri}...")

        # retrieve the secret from Key Vault; CREDENTIAL was set globally
        secret_client = SecretClient(vault_url=vault_uri, credential=CREDENTIAL)

        # get secret value from Key Vault
        secret_value = secret_client.get_secret(secret_name).value

    except Exception as ex:
        print(f"retrieving secret: {ex}")
    
    return secret_value

# main function
def main():
    # create a Daemon tread
    t = threading.Thread(daemon=True, target=run, name="worker")
    t.start()
    

    killer = GracefulKiller()
    while not killer.kill_now:
        time.sleep(1)

    logging.info("Doing some important cleanup before exiting")
    logging.info("Gracefully exiting")


if __name__ == "__main__":
    main()

On Docker Hub, the gbaeke/worker:1.0.0 image runs this code. The following manifest runs the code on Kubernetes with the same managed identity as the Azure CLI example (same service account):

apiVersion: apps/v1
kind: Deployment
metadata:
  name: worker
  namespace: default
  labels:
    app: worker
spec:
  replicas: 1
  selector:
    matchLabels:
      app: worker
  template:
    metadata:
      labels:
        app: worker
    spec:
      # needs to refer to service account used with federation
      serviceAccount: sademo
      containers:
        - name: worker
          image: gbaeke/worker:1.0.0
          env:
            - name: AZURE_APPCONFIGURATION_ENDPOINT
              value: https://ac-appconfig-vr6774lz3bh4i.azconfig.io

Note that the code tries to connect to Azure App Configuration. The managed identity has been given the App Configuration Data Reader role on a specific instance. The code tries to read the value of key myapp:appkey with label prd from that instance:

App Config key and values

To make the code work, the environment variable AZURE_APPCONFIGURATION_ENDPOINT is set to the URL of the App Config instance.

In the container logs, we can see that the value was successfully retrieved:

Log stream of worker

And yes, the code just works! It successfully connected to App Config and retrieved the value. The environment variables, set by the webhook discussed earlier, make this work, together with the Python Azure identity library!

Conclusion

Workload Identity works like a charm and is relatively easy to configure. At the time of writing (end of November 2022), I guess we are pretty close to general availability and we finally will have a fully supported managed identity solution for AKS and beyond!

A quick look at Azure App Configuration and the Python Provider

When developing an application, it is highly likely that it needs to be configured with all sorts of settings. A simple list of key/value pairs is usually all you need. Some of the values can be read by anyone (e.g., a public URL) while some values should be treated as secrets (e.g., a connection string).

Azure App Configuration is a service to centrally manage these settings in addition to feature flags. In this post, we will look at storing and retrieving application settings and keeping feature flags for another time. I will also say App Config instead of App Configuration to save some keystrokes. 😉

We will do the following:

  • Retrieve key-value pairs for multiple applications and environments from one App Config instance
  • Use Key Vault references in App Config and retrieve these from Key Vault directly
  • Use the Python provider client to retrieve key-value pairs and store them in a Python dictionary

Why use App Configuration at all?

App Configuration helps by providing a fully managed service to store configuration settings for your applications separately from your code. Storing configuration separate from code is a best practice that most developers should follow.

Although you could store configuration values in files, using a service like App Config provides some standardization within or across developer teams.

Some developers store both configuration values and secrets in Key Vault. Although that works, App Config is way more flexible in organizing the settings and retrieving lists of settings with key and label filters. If you need to work with more than a few settings, I would recommend using a combination of App Config and Key Vault.

In what follows, I will show how we store settings for multiple applications and environments in the same App Config instance. Some of these settings will be Key Vault references.

Read https://learn.microsoft.com/en-us/azure/azure-app-configuration/overview before continuing to know more about App Config.

Provisioning App Config

Provisioning App Configuration is very easy from the portal or the Azure CLI. With the Azure CLI, use the following commands to create a resource group and an App Configuration instance in that group:

az group create -n RESOURCEGROUP -l LOCATION
az appconfig create -g RESOURCEGROUP  -n APPCONFIGNAME -l LOCATION

After deployment, we can check the portal and navigate to Configuration Explorer.

App Configuration in the Azure Portal

In Configuration Explorer, you can add the configuration values for your apps. They are just key/value pairs but they can be further enriched with labels, content types, and tags.

Note that there is a Free and a Standard tier of App Config. See https://azure.microsoft.com/en-us/pricing/details/app-configuration/ for more information. In production, you should use the Standard tier.

Storing configuration and secrets for multiple apps and environments

To store configuration values for multiple applications, you will have to identify the application in the key. App Configuration, oddly, has no knowledge of applications. For example, a key could be app1:setting1. You decide on the separator between the app name (app1 here) and its setting (setting1). In your code, you can easily query all settings for your app with a key filter (e.g. “app1:”. I will show an example of using a key filter later with the Python provider.

If you want to have different values for a key per environment (like dev, prd, etc…), you can add a label for each environment. To retrieve all settings for an environment, you can use a label filter. I will show an example of using a label filter later.

Suppose you want to use app1:setting1 in two environments: dev and prd. How do you create the key-value pairs? One way is to use the Azure CLI. You can also create them with the portal or from Python, C#, etc… With the CLI:

az appconfig kv set --name APPCONFIGNAME  --key app1:setting1 --value "value1" --label dev

APPCONFIG name is the name of your App Config instance. Just the name, not the full URL. For the prd environment:

az appconfig kv set --name APPCONFIGNAME  --key app1:setting1 --value "value2" --label prd

In Configuration Explorer, you will now see:

app1:setting1 for two environments (via labels)

For more examples of using the Azure CLI, see https://learn.microsoft.com/en-us/azure/azure-app-configuration/scripts/cli-work-with-keys.

In addition to these plain key-value pairs, you can also create Key Vault references. Let’s create one from the portal. In Configuration Explorer, click + Create and select Key Vault reference. You will get the following UI that allows you to create the reference. Make sure you have a Key Vault with a secret called dev-mysecret if you want to follow along. Below, set the label to dev. I forgot that in the screenshot below:

Creating a Key Vault Reference

Above, I am using the same naming convention for the key in App Config: app1:mysecret. Notice though that the secret I am referencing in Key Vault contains the environment and a dash (-) before the actual secret name. If you use one Key Vault per app instead of a Key Vault per app and environment, you will have to identify the environment in the secret name in some way.

After creating the reference, you will see the following in Configuration explorer:

Configuration explorer with one Key Vault reference

Note that the Key Vault reference has a content type. The content type is application/vnd.microsoft.appconfig.keyvaultref+json;charset=utf-8. You can use the content type in your code to know if the key contains a reference to a Key Vault secret. That reference will be something like https://kv-app1-geba.vault.azure.net/secrets/dev-mysecret. You can then use the Python SDK for Azure Key Vault to retrieve the secret from your code. Azure App Config will not do that for you.

You can use content types in other ways as well. For example, you could store a link to a storage account blob and use a content type that informs your code it needs to retrieve the blob from the account. Of course, you will need to write code to retrieve the blob. App Config only contains the reference.

Reading settings

There are many ways to read settings from App Config. If you need them in an Azure Pipeline, for instance, you can use the Azure App Configuration task to pull keys and values from App Config and set them as Azure pipeline variables.

If you deploy your app to Kubernetes and you do not want to read the settings from your code, you can integrate App Configuration with Helm. See https://learn.microsoft.com/en-us/azure/azure-app-configuration/integrate-kubernetes-deployment-helm for more information.

In most cases though, you will want to read the settings directly from your code. There is an SDK for several languages, including Python. The SDK has all the functionality you need to read and write settings.

Next to the Python SDK, there is also a Python provider which is optimized to read settings from App Config and store them in a Python dictionary. The provider has several options to automatically trim app names from keys and to automatically retrieve a secret from Key Vault if the setting in App Config is a Key Vault reference.

To authenticate to App Config, the default is access keys with a connection string. You can find the connection string in the Portal:

App Config Connection string for read/write or just read

You can also use Azure AD (it’s always enabled) and disable access keys. In this example, I will use a connection string to start with:

Before we connect and retrieve the values, ensure you install the provider first:

pip install azure-appconfiguration-provider

Above, use pip or pip3 depending on your installation of Python.

In your code, ensure the proper imports:

from azure.appconfiguration.provider import (
    AzureAppConfigurationProvider,
    SettingSelector,
    AzureAppConfigurationKeyVaultOptions
)
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential

To authenticate to Azure Key Vault with Azure AD, we can use DefaultAzureCredential():

try:
    CREDENTIAL = DefaultAzureCredential(exclude_visual_studio_code_credential=True)
except Exception as ex:
    print(f"error setting credentials: {ex}")

Note: on my machine, I had an issue with the VS Code credential feature so I turned that off.

Next, use a SettingSelector from the provider to provide a key filter and label filter. I want to retrieve key-value pairs for an app called app1 and an environment called dev:

app = 'app1'
env = 'dev'
selects = {SettingSelector(key_filter=f"{app}:*", label_filter=env)}

Next, when I retrieve the key-value pairs, I want to strip app1: from the keys:

trimmed_key_prefixes = {f"{app}:"}

In addition, I want the provider to automatically go to Key Vault and retrieve the secret:

key_vault_options = AzureAppConfigurationKeyVaultOptions(secret_resolver=retrieve_secret)

retrieve_secret refers to a function you need to write to retrieve the secret and add custom logic. There are other options as well.

def retrieve_secret(uri):
    try:
        # uri is in format: https://<keyvaultname>.vault.azure.net/secrets/<secretname>
        # retrieve key vault uri and secret name from uri
        vault_uri = "https://" + uri.split('/')[2]
        secret_name = uri.split('/')[-1]
        print(f"Retrieving secret {secret_name} from {vault_uri}...")
 
        # retrieve the secret from Key Vault; CREDENTIAL was set globally
        secret_client = SecretClient(vault_url=vault_uri, credential=CREDENTIAL)
 
        # get secret value from Key Vault
        secret_value = secret_client.get_secret(secret_name).value
 
    except Exception as ex:
        print(f"retrieving secret: {ex}", 1)

    return secret_value

Now that we have all the options, we can retrieve the key-value pairs.

connection_string = 'YOURCONNSTR'
app_config = AzureAppConfigurationProvider.load(
    connection_string=connection_string, selects=selects, key_vault_options=key_vault_options, 
    trimmed_key_prefixes=trimmed_key_prefixes)

print(app_config)

Now we have a Python dictionary app_config with all key-value pairs for app1 and environment dev. The key-value pairs are a mix of plain values from App Config and Key Vault.

You can now use this dictionary in your app in whatever way you like.

If you would like to use the same CREDENTIAL to connect to App Config, you can also use:

endpoint = 'APPCONFIGNAME.azconfig.io' # no https://
app_config = AzureAppConfigurationProvider.load(
    endpoint=endpoint, credential=CREDENTIAL, selects=selects, key_vault_options=key_vault_options, 
    trimmed_key_prefixes=trimmed_key_prefixes)

Ensure the credential you use has the App Configuration Data Reader role to read the key-value pairs.

Here’s all the code in a gist: https://gist.github.com/gbaeke/9b075a87a1198cdcbcc2b2028492085b. Ensure you have the key-value pairs as above and provide the connection string to the connection_string variable.

Conclusion

In this post, we showed how to retrieve key-value pairs with the Python provider from one App Config instance for multiple applications and environments.

The application is stored as a prefix in the key (app1:). The environment is a label (e.g., dev), allowing us to have the same setting with different values per environment.

Some keys can contain a reference to Key Vault to allow your application to retrieve secrets from Key Vault as well. I like this approach to have a list of all settings for an app and environment, where the value of the key can be an actual value or a reference to some other entity like a secret, a blob, or anything else.

First steps with Crossplane

Image Source: crossplane.io

Although Crossplane has been around for a while, I never got around to trying it. Crossplane has many capabilities. However, in this post, I will focus on the following aspects:

  • Installing Crossplane on a Kubernetes cluster (AKS); you can install on a local cluster as well (e.g., k3s, kind, minikube, …) but then you would need Azure Arc for Kubernetes to install the microsoft.flux extension (I will be using GitOps with Flux via that extension)
  • Adding and configuring providers for Azure and Kubernetes: providers allow you to deploy to Azure and Kubernetes (and much more) from Crossplane
  • Deploying Azure infrastructure with Crossplane using a fully declarative GitOps approach

Introduction

Crossplane basically allows you to build a control plane that you or your teams can use to deploy infrastructure and applications. This control plane is built on Kubernetes. In short, suppose I want to deploy an Azure resource group with Crossplane, I would create the below YAML file and apply it with kubectl apply -f filename.yaml.

This is, in essence, a fully declarative approach to deploying Azure infrastructure using Kubernetes. There are other projects, such as the Azure Service Operator v2, that do something similar.

apiVersion: azure.jet.crossplane.io/v1alpha2
kind: ResourceGroup
metadata:
  name: rg-crossplane
spec:
  forProvider:
    location: "westeurope"
    tags:
      provisioner: crossplane
  providerConfigRef:
    name: default

In order to enable this functionality, you need the following:

  • Install Crossplane on your Kubernetes cluster
  • Add a provider that can create Azure resources; above the jet provider for Azure is used; more about providers later
  • Configure the provider with credentials; in this case Azure credentials

In a diagram:

Install Crossplane from git with Flux on AKS; deploy an Azure resource group and another AKS cluster from Crossplane; create a namespace on that new cluster

Combination with GitOps

Although you can install and configure Crossplane manually and just use kubectl to add custom resources, I wanted to add Crossplane and custom resources using GitOps. To that end, I am using Azure Kubernetes Service (AKS) with the microsoft.flux extension. For more information to enable and install the extension, see my Flux v2 quick guide.

⚠️ The git repository I am using with Flux v2 and Crossplane is here: https://github.com/gbaeke/crossplane/tree/blogpost. This refers to the blogpost branch, which should match the content of this post. Tbe main branch might be different.

The repo contains several folders that match Flux kustomizations:

  • infra folder: installs Crossplane and Azure Key Vault to Kubernetes; an infra kustomization will point to this folder
  • secrets folder: creates a secret with Azure Key Vault to Kubernetes from Azure Key Vault; the secrets kustomization will point to this folder
  • crossplane-apps folder: installs Azure resources and Kubernetes resources with the respective Crossplane providers; the apps kustomization will point to this folder

Note: if you do not know what Flux kustomizations are and how Flux works, do check my Flux playlist: https://www.youtube.com/playlist?list=PLG9qZAczREKmCq6on_LG8D0uiHMx1h3yn. The videos look at the open source version of Flux and not the microsoft.flux extension. To learn more about that extension, see https://www.youtube.com/watch?v=w_eoJbgDs3g.

Installing Crossplane

The infra customization installs Crossplane and Azure Key Vault to Kubernetes. The latter is used to sync a secret from Key Vault that contains credentials for the Crossplane Azure provider. More details are in the diagram below:

As noted above, the installation of Crossplane is done with Flux. First, there is the HelmRepository resource that adds the Crossplane Helm repository to Flux.

apiVersion: source.toolkit.fluxcd.io/v1beta2
kind: HelmRepository
metadata:
  namespace: config-infra
  name: crossplane
spec:
  interval: 1m0s
  url: https://charts.crossplane.io/stable

Next, there is the HelmRelease that installs Crossplane. Important: target namespace is crossplane-system (bottom line):

apiVersion: helm.toolkit.fluxcd.io/v2beta1
kind: HelmRelease
metadata:
  name: crossplane
  namespace: config-infra
spec:
  chart:
    spec:
      chart: crossplane
      reconcileStrategy: ChartVersion
      sourceRef:
        kind: HelmRepository
        name: crossplane
        namespace: config-infra
  install:
    createNamespace: true
  interval: 1m0s
  targetNamespace: crossplane-system

For best results, in the YAML above, set the namespace of the resource to the namespace you use with the AKS k8s-configuration. The resources to install Azure Key Vault to Kubernetes are similar.

To install the Crossplane jet provider for Azure:

---
apiVersion: pkg.crossplane.io/v1alpha1
kind: ControllerConfig
metadata:
  name: jet-azure-config
  labels:
    app: crossplane-provider-jet-azure
spec:
  image: crossplane/provider-jet-azure-controller:v0.9.0
  args: ["-d"]
---
apiVersion: pkg.crossplane.io/v1
kind: Provider
metadata:
  name: crossplane-provider-jet-azure
spec:
  package: crossplane/provider-jet-azure:v0.9.0
  controllerConfigRef:
    name: jet-azure-config

Above, debugging is turned on for the provider. This is optional. The provider actually runs in the crossplane-system namespace:

jet provider

The provider is added via the Provider resource (second resource in the YAML manifest).

We can now create the AKS k8s-configuration, which creates a Flux source and a kustomization:

RG=your AKS resource group
CLUSTER=your AKS cluster name (to install Crossplane to)

az k8s-configuration flux create -g $RG -c $CLUSTER \
  -n cluster-config --namespace config-infra -t managedClusters \
  --scope cluster -u https://github.com/gbaeke/crossplane \
  --branch main  \
  --kustomization name=infra path=./infra prune=true

The Flux source will be the repo specified with -u. There is one kustomization: infra. Pruning is turned on. With pruning, removing manifests from the repo results is removing them from Kubernetes.

The k8s-configuration should result in:

Don’t mind the other Kustomizations; will be added later; this is the GitOps view in the properties of the cluster in the Azure Portal

Crossplane is now installed with two providers. We can now configure the Azure provider with credentials.

Configuring Azure Credentials

You need to create a service principal by following the steps in https://crossplane.io/docs/v1.9/cloud-providers/azure/azure-provider.html. I compacted the resulting JSON with:

cat <path-to-JSON> | jq -c

The output of the above command was added to Key Vault:

Azure creds in Key Vault

The Key Vault I am using uses the Azure RBAC permission model. Ensure that the AKS cluster’s kubelet identity has at least the Key Vault Secrets User role. It is a user-assigned managed identity with a name like clustername-agentpool.

To actually create a Kubernetes secret from this Key Vault secret, the secrets folder in the git repo contains the manifest below:

apiVersion: spv.no/v2beta1
kind: AzureKeyVaultSecret
metadata:
  name: azure-creds 
  namespace: crossplane-system
spec:
  vault:
    name: kvgebadefault # name of key vault
    object:
      name: azure-creds # name of the akv object
      type: secret # akv object type
  output: 
    secret: 
      name: azure-creds # kubernetes secret name
      dataKey: creds # key to store object value in kubernetes secret

This creates a Kubernetes secret in the crossplane-system namespace with name azure-creds and a key creds that holds the credentials JSON.

Secret as seen in k9s
the decoded secret as shown in k9s

To add the secret(s) as an extra kustomization, run:

RG=your AKS resource group
CLUSTER=your AKS cluster name

az k8s-configuration flux create -g $RG -c $CLUSTER \
  -n cluster-config --namespace config-infra -t managedClusters \
  --scope cluster -u https://github.com/gbaeke/crossplane \
  --branch main  \
  --kustomization name=infra path=./infra prune=true \
  --kustomization name=secrets path=./secrets prune=true dependsOn=["infra"]

Note that the secrets kustomization is dependent on the infra kustomization. After running this command, ensure the secret is in the crossplane-system namespace. The k8s-configuration uses the same source but now has two kustomizations.

Deploying resources with the Jet provider for Azure

Before explaining how to create Azure resources, a note on providers. As a novice Crossplane user, I started with the following Azure provider: https://github.com/crossplane-contrib/provider-azure. This works well but it is not so simple for contributors to ensure the provider is up-to-date with the latest and greatest Azure features. For example, if you deploy AKS, you cannot use managed identity, the cluster uses availability sets etc…

To improve this, Terrajet was created. It is a code generation framework that can generate Crossplane CRDs (custom resource definitions) and sets up the provider to use Terraform. Building on top of Terraform is an advantage because it is more up-to-date with new cloud features. That is the reason why this post uses the jet provider. When we later create an AKS cluster, it will take advantage of managed identity and other newer features.

Note: there is also a Terraform provider that can take Terraform HCL to do anything you want; we are not using that in this post

Ok, let’s create a resource group and deploy AKS. First, we have to configure the provider with Azure credentials. The crossplane-apps folder contains a file called jet-provider-config.yaml:

apiVersion: azure.jet.crossplane.io/v1alpha1
kind: ProviderConfig
metadata:
  name: default
spec:
  credentials:
    source: Secret
    secretRef:
      namespace: crossplane-system
      name: azure-creds
      key: creds

The above ProviderConfig tells the provider to use the credentials in the Kubernetes secret we created earlier. We know we are configuring the jet provider from the apiVersion: azure.jet.crossplane.io/v1alpha1.

With that out of the way, we can create the resource group and AKS cluster. Earlier in this post, the YAML to create the resource group was already shown. To create a basic AKS cluster called clu-cp in this group, aks.yaml is used:

apiVersion: containerservice.azure.jet.crossplane.io/v1alpha2
kind: KubernetesCluster
metadata:
  name: clu-cp
spec:
  writeConnectionSecretToRef:
    name: example-kubeconfig
    namespace: crossplane-system
  forProvider:
    location: "westeurope"
    resourceGroupNameRef:
      name: rg-crossplane
    dnsPrefix: "clu-cp"
    defaultNodePool:
      - name: default
        nodeCount: 1
        vmSize: "Standard_D2_v2"
    identity:
      - type: "SystemAssigned"
    tags:
      environment: dev
  providerConfigRef:
    name: default

Above, we refer to our resource group by name (resourceGroupNameRef) and we write the credentials to our cluster to a secret (writeConnectionSecretToRef). That secret will contain keys with the certificate and private key, but also a kubeconfig key with a valid kubeconfig file. We can use that later to connect and deploy to the cluster.

To see an example of connecting to the deployed cluster and creating a namespace, see k8s-provider-config.yaml and k8s-namespace.yaml in the repo. The resource k8s-provider-config.yaml will use the example-kubeconfig secret created above to connect to the AKS cluster that we created in the previous steps.

To create a kustomization for the crossplane-apps folder, run the following command:

RG=your AKS resource group
CLUSTER=your AKS cluster name

az k8s-configuration flux create -g $RG -c $CLUSTER \
  -n cluster-config --namespace config-infra -t managedClusters \
  --scope cluster -u https://github.com/gbaeke/crossplane \
  --branch main  \
  --kustomization name=infra path=./infra prune=true \
  --kustomization name=secrets path=./secrets prune=true dependsOn=["infra"] \
  --kustomization name=apps path=./crossplane-apps prune=true dependsOn=["secrets"]

This folder does not contain a kustomization.yaml file. Any manifest you drop in it will be applied to the cluster! The k8s-kustomization now has the same source but three kustomizations:

infra, secrets and apps kustomizations

After a while, an AKS cluster clu-cp should be deployed to resource group rg-crossplane:

AKS deployed by Crossplane running on another AKS cluster

To play around with this, I recommend using Visual Studio Code and the GitOps extension. When you make a change locally and push to main, to speed things up, you can reconcile the git repository and the apps kustomization manually:

Reconcile the GitRepository source and kustomization from the GitOps extension for Visual Studio Code

Conclusion

In this post, we looked at installing and configuring Crossplane on AKS via GitOps and the microsoft.flux extension. In addition, we deployed a few Azure resources with Crossplane and its jet provider for Azure. We only scratched the surface here but I hope this gets you started quickly when evaluating Crossplane for yourself.

Learn to use the Dapr authorization middleware

Based on a customer conversation, I decided to look into the Dapr middleware components. More specifically, I wanted to understand how the OAuth 2.0 middleware works that enables the Authorization Code flow.

In the Authorization Code flow, an authorization code is a temporary code that a client obtains after being redirected to an authorization URL (https://login.microsoftonline.com/{tenant}/oauth2/authorize) where you provide your credentials interactively (not useful for service-service non-interactive scenarios). That code is then handed to your app which exchanges it for an access token. With the access token, the authenticated user can access your app.

Instead of coding this OAuth flow in your app, we will let the Dapr middleware handle all of that work. Our app can then pickup the token from an HTTP header. When there is a token, access to the app is granted. Otherwise, Dapr (well, the Dapr sidecar next to your app) redirects your client to the authorization server to get a code.

Let’s take a look how this all works with Azure Active Directory. Other authorization servers are supported as well: Facebook, GitHub, Google, and more.

What we will build

Some experience with Kubernetes, deployments, ingresses, Ingress Controllers and Dapr is required.

If you think the explanation below can be improved, or I have made errors, do let me know. Let’s go…

Create an app registration

Using Azure AD means we need an app registration! Other platforms have similar requirements.

First, create an app registration following this quick start. In the first step, give the app a name and, for this demo, just select Accounts in this organizational directory only. The redirect URI will be configured later so just click Register.

After following the quick start, you should have:

  • the client ID and client secret: will be used in the Dapr component
  • the Azure AD tenant ID: used in the auth and token URLs in the Dapr component; Dapr needs to know where to redirect to and where to exchange the authorization code for an access token
App registration in my Azure AD Tenant

There is no need for your app to know about these values. All work is done by Dapr and Dapr only!

We will come back to the app registration later to create a redirect URI.

Install an Ingress Controller

We will use an Ingress Controller to provide access to our app’s Dapr sidecar from the Internet, using HTTP.

In this example, we will install ingress-nginx. Use the following commands (requires Helm):

helm upgrade --install ingress-nginx ingress-nginx \
  --repo https://kubernetes.github.io/ingress-nginx \
  --namespace ingress-nginx --create-namespace

Although you will find articles about daprizing your Ingress Controller, we will not do that here. We will use the Ingress Controller simply as a way to provide HTTP access to the Dapr sidecar of our app. We do not want Dapr-to-Dapr gRPC traffic between the Ingress Controller and our app.

When ingress-nginx is installed, grab the public IP address of the service that it uses. Use kubectl get svc -n ingress-nginx. I will use the IP address with nip.io to construct a host name like app.11.12.13.14.nip.io. The nip.io service resolves such a host name to the IP address in the name automatically.

The host name will be used in the ingress and the Dapr component. In addition, use the host name to set the redirect URI of the app registration: https://app.11.12.13.14.nip.io. For example:

Added a platform configuration for a web app and set the redirect URI

Note that we are using https here. We will configure TLS on the ingress later.

Install Dapr

Install the Dapr CLI on your machine and run dapr init -k. This requires a working Kubernetes context to install Dapr to your cluster. I am using a single-node AKS cluster in Azure.

Create the Dapr component and configuration

Below is the Dapr middleware component we need. The component is called myauth. Give it any name you want. The name will later be used in a Dapr configuration that is, in turn, used by the app.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: myauth
spec:
  type: middleware.http.oauth2
  version: v1
  metadata:
  - name: clientId
    value: "CLIENTID of your app reg"
  - name: clientSecret
    value: "CLIENTSECRET that you created on the app reg"
  - name: authURL
    value: "https://login.microsoftonline.com/TENANTID/oauth2/authorize"
  - name: tokenURL
    value: "https://login.microsoftonline.com/TENANTID/oauth2/token"
  - name: redirectURL
    value: "https://app.YOUR-IP.nip.io"
  - name: authHeaderName
    value: "authorization"
  - name: forceHTTPS
    value: "true"
scopes:
- super-api

Replace YOUR-IP with the public IP address of the Ingress Controller. Also replace the TENANTID.

With the information above, Dapr can exchange the authorization code for an access token. Note that the client secret is hard coded in the manifest. It is recommended to use a Kubernetes secret instead.

The component on its own is not enough. We need to create a Dapr configuration that references it:

piVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
  name: auth
spec:
  tracing:
    samplingRate: "1"
  httpPipeline:
    handlers:
    - name: myauth # reference the oauth component here
      type: middleware.http.oauth2    

Note that the configuration is called auth. Our app will need to use this configuration later, via an annotation on the Kubernetes pods.

Both manifests can be submitted to the cluster using kubectl apply -f. It is OK to use the default namespace for this demo. Keep the configuration and component in the same namespace as your app.

Deploy the app

The app we will deploy is super-api, which has a /source endpoint to dump all HTTP headers. When authentication is successful, the authorization header will be in the list.

Here is deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: super-api-deployment
  labels:
    app: super-api
spec:
  replicas: 1
  selector:
    matchLabels:
      app: super-api
  template:
    metadata:
      labels:
        app: super-api
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "super-api"
        dapr.io/app-port: "8080"
        dapr.io/config: "auth" # refer to Dapr config
        dapr.io/sidecar-listen-addresses: "0.0.0.0" # important
    spec:
      securityContext:
        runAsUser: 10000
        runAsNonRoot: true
      containers:
        - name: super-api
          image: ghcr.io/gbaeke/super:1.0.7
          securityContext:
            readOnlyRootFilesystem: true
            capabilities:
              drop:
                - all
          args: ["--port=8080"]
          ports:
            - name: http
              containerPort: 8080
              protocol: TCP
          env:
            - name: IPADDRESS
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
            - name: WELCOME
              value: "Hello from the Super API on AKS!!! IP is: $(IPADDRESS)"
            - name: LOG
              value: "true"       
          resources:
              requests:
                memory: "64Mi"
                cpu: "50m"
              limits:
                memory: "64Mi"
                cpu: "50m"
          livenessProbe:
            httpGet:
              path: /healthz
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 15
          readinessProbe:
              httpGet:
                path: /readyz
                port: 8080
              initialDelaySeconds: 5
              periodSeconds: 15

Note the annotations in the manifest above:

  • dapr.io/enabled: injects the Dapr sidecar in the pods
  • dapr.io/app-id: a Dapr app needs an id; a service will automatically be created with that id and -dapr appended; in our case the name will be super-api-dapr; our ingress will forward traffic to this service
  • dapr.io/app-port: Dapr will need to call endpoints in our app (after authentication in this case) so it needs the port that our app container uses
  • dapr.io/config: refers to the configuration we created above, which enables the http middleware defined by our OAuth component
  • dapr.io/sidecar-listen-addresses: ⚠️ needs to be set to “0.0.0.0”; without this setting, we will not be able to send requests to the Dapr sidecar directly from the Ingress Controller

Submit the app manifest with kubectl apply -f.

Check that the pod has two containers: the Dapr sidecar and your app container. Also check that there is a service called super-api-dapr. There is no need to create your own service. Our ingress will forward traffic to this service.

Create an ingress

In the same namespace as the app (default), create an ingress. This requires the ingress-nginx Ingress Controller we installed earlier:

apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: super-api-ingress
  namespace: default
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /
spec:
  ingressClassName: nginx
  tls:
    - hosts:
      - app.YOUR-IP.nip.io
      secretName: tls-secret 
  rules:
  - host: app.YOUR-IP.nip.io
    http:
      paths:
      - pathType: Prefix
        path: "/"
        backend:
          service:
            name: super-api-dapr
            port: 
              number: 80

Replace YOUR-IP with the public IP address of the Ingress Controller.

For this to work, you also need a secret with a certificate. Use the following commands:

openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout tls.key -out tls.crt -subj "/CN=app.YOUR-IP.nip.io"
kubectl create secret tls tls-secret --key tls.key --cert tls.crt

Replace YOUR-IP as above.

Testing the configuration

Let’s use the browser to connect to the /source endpoint. You will need to use the Dapr invoke API because the request will be sent to the Dapr sidecar. You need to speak a language that Dapr understands! The sidecar will just call http://localhost:8080/source and send back the response. It will only call the endpoint when authentication has succeeded, otherwise you will be redirected.

Use the following URL in the browser. It’s best to use an incognito session or private window.

https://app.20.103.17.249.nip.io/v1.0/invoke/super-api/method/source

Your browser will warn you of security risks because the certificate is not trusted. Proceed anyway! 😉

Note: we could use some URL rewriting on the ingress to avoid having to use /v1.0/invoke etc… You can also use different URL formats. See the docs.

You should get an authentication screen which indicates that the Dapr configuration is doing its thing:

Redirection to the authorize URL

After successful authentication, you should see the response from the /source endpoint of super-api:

Response from /source

The response contains an Authorization header. The header contains a JWT after the word Bearer. You can paste that JWT in https://jwt.io to see its content. We can only access the app with a valid token. That’s all we do in this case, ensuring only authenticated users can access our app.

Conclusion

In this article, we used Dapr to secure access to an app without having to modify the app itself. The source code of super-api was not changed in any way to enable this functionality. Via a component and a configuration, we instructed our app’s Dapr sidecar to do all this work for us. App endpoints such as /source are only called when there is a valid token. When there is such a token, it is saved in a header of your choice.

It is important to note that we have to send HTTP requests to our app’s sidecar for this to work. To enable this, we instructed the sidecar to listen on all IP addresses of the pod, not just 127.0.0.1. That allows us to send HTTP requests to the service that Dapr creates for the app. The ingress forwards requests to the Dapr service directly. That also means that you have to call your endpoint via the Dapr invoke API. I admit that can be confusing in the beginning. 😉

Note that, at the time of this writing (June 2022), the OAuth2 middleware in Dapr is in an alpha state.

Publish your AKS Ingress Controller over Azure Private Link

In a previous article, I wrote about the AKS Azure Cloud Provider and its support for Azure Private Link. In summary, the functionality allows for the following:

  • creation of a Kubernetes service of type LoadBalancer
  • via an annotation on the service, the Azure Cloud Provider creates an internal load balancer (ILB) instead of a public one
  • via extra annotations on the service, the Azure Cloud Provider creates an Azure Private Link Service for the Internal Load Balancer (🆕)

In the article, I used Azure Front Door as an example to securely publish the Kubernetes service to the Internet via private link.

Although you could publish all your services using the approach above, that would not be very efficient. In the real world, you would use an Ingress Controller like ingress-nginx to avoid the overhead of one service of type LoadBalancer per application.

Publish the Ingress Controller with Private Link Service

In combination with the Private Link Service functionality, you can just publish an Ingress Controller like ingress-nginx. That would look like the diagram below:

In the above diagram, our app does not use a LoadBalancer service. Instead, the service is of the ClusterIP type. To publish the app externally, an ingress resource is created to publish the app via ingress-nginx. The ingress resource refers to the ClusterIP service super-api. There is nothing new about this. This is Kubernetes ingress as usual:

apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: super-api-ingress
spec:
  ingressClassName: nginx
  rules:
  - host: www.myingress.com
    http:
      paths:
      - pathType: Prefix
        path: "/"
        backend:
          service:
            name: super-api
            port: 
              number: 80

Note that I am using the host http://www.myingress.com as an example here. In Front Door, I will need to configure a custom host header that matches the ingress host. Whenever Front Door connects to the Ingress Controller via Private Link Service, the host header will be sent to allow ingress-nginx to route traffic to the super-api service.

In the diagram, you can see that it is the ingress-nginx service that needs the annotations to create a private link service. When you install ingress-nginx with Helm, just supply a values file with the following content:

controller:
 service:
    annotations:
      service.beta.kubernetes.io/azure-load-balancer-internal: "true"
      service.beta.kubernetes.io/azure-pls-create: "true"
      service.beta.kubernetes.io/azure-pls-ip-configuration-ip-address: IP_IN_SUBNET
      service.beta.kubernetes.io/azure-pls-ip-configuration-ip-address-count: "1"
      service.beta.kubernetes.io/azure-pls-ip-configuration-subnet: SUBNET_NAME
      service.beta.kubernetes.io/azure-pls-name: PLS_NAME
      service.beta.kubernetes.io/azure-pls-proxy-protocol: "false"
      service.beta.kubernetes.io/azure-pls-visibility: '*'

Via the above annotations, the service created by the ingress-nginx Helm chart will use an internal load balancer. In addition, a private link service for the load balancer will be created.

Front Door Config

The Front Door configuration is almost the same as before, except that we need to configure a host header on the origin:

Host header config in Front Door origin

When I issue the command below (FQDN is the Front Door endpoint):

 curl https://aks-agbyhedaggfpf5bs.z01.azurefd.net/source

the response is the following:

Hello from Super API
Source IP and port: 10.244.0.12:40244
X-Forwarded-For header: 10.224.10.20

All headers:

HTTP header: X-Real-Ip: [10.224.10.20]
HTTP header: X-Forwarded-Scheme: [http]
HTTP header: Via: [2.0 Azure]
HTTP header: X-Azure-Socketip: [MY HOME IP]
HTTP header: X-Forwarded-Host: [www.myingress.com]
HTTP header: Accept: [*/*]
HTTP header: X-Azure-Clientip: [MY HOME IP]
HTTP header: X-Azure-Fdid: [f76ca0ce-32ed-8754-98a9-e6c02a7765543]
HTTP header: X-Request-Id: [5fd6bb9c1a4adf4834be34ce606d980e]
HTTP header: X-Forwarded-For: [10.224.10.20]
HTTP header: X-Forwarded-Port: [80]
HTTP header: X-Original-Forwarded-For: [MY HOME IP, 147.243.113.173]
HTTP header: User-Agent: [curl/7.58.0]
HTTP header: X-Azure-Requestchain: [hops=2]
HTTP header: X-Forwarded-Proto: [http]
HTTP header: X-Scheme: [http]
HTTP header: X-Azure-Ref: [0nPGlYgAAAABefORrczaWQ52AJa/JqbBAQlJVMzBFREdFMDcxMgBmNzZjYTBjZS0yOWVkLTQ1NzUtOThhOS1lNmMwMmE5NDM0Mzk=, 20220612T140100Z-nqz5dv28ch6b76vb4pnq0fu7r40000001td0000000002u0a]

The /source endpoint of super-api dumps all the HTTP headers. Note the following:

  • X-Real-Ip: is the address used for NATting by the private link service
  • X-Azure-Fdid: is the Front Door Id that allows us to verify that the request indeed passed Front Door
  • X-Azure-Clientip: my home IP address; this is the result of setting externalTrafficPolicy: Local on the ingress-nginx service; the script I used to install ingress-nginx happened to have this value set; it is not required unless you want the actual client IP address to be reported
  • X-Forwarded-Host: the host header; the original FQDN aks-agbyhedaggfpf5bs.z01.azurefd.net cannot be seen

In the real world, you would configure a custom domain in Front Door to match the configured host header.

Conclusion

In this post, we published a Kubernetes Ingress Controller (ingress-nginx) via an internal load balancer and Azure Private Link. A service like Azure Front Door can use this functionality to provide external connectivity to the internal Ingress Controller with extra security features such as Azure WAF. You do not have to use Front Door. You can provide access to the Ingress Controller from a Private Endpoint in any network and any subscription, including subscriptions you do not control.

Although this functionality is interesting, it is not automated and integrated with Kubernetes ingress functionality. For that reason alone, I would not recommend using this. It does provide the foundation to create an alternative to Application Gateway Ingress Controller. The only thing that is required is to write a controller that integrates Kubernetes ingress with Front Door instead of Application Gateway. 😉

%d bloggers like this: